Merge pull request #10377 from taosdata/feature/3.0Trans
refactor trans code
This commit is contained in:
commit
3d02c75895
|
@ -78,6 +78,9 @@ typedef struct SRpcInit {
|
|||
// call back to retrieve the client auth info, for server app only
|
||||
int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
|
||||
|
||||
// call back to keep conn or not
|
||||
bool (*pfp)(void *parent, tmsg_t msgType);
|
||||
|
||||
void *parent;
|
||||
} SRpcInit;
|
||||
|
||||
|
|
|
@ -20,17 +20,17 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "taos.h"
|
||||
#include "common.h"
|
||||
#include "tmsg.h"
|
||||
#include "parser.h"
|
||||
#include "query.h"
|
||||
#include "taos.h"
|
||||
#include "tdef.h"
|
||||
#include "tep.h"
|
||||
#include "thash.h"
|
||||
#include "tlist.h"
|
||||
#include "tmsg.h"
|
||||
#include "tmsgtype.h"
|
||||
#include "trpc.h"
|
||||
#include "query.h"
|
||||
#include "parser.h"
|
||||
|
||||
#include "config.h"
|
||||
|
||||
|
@ -48,12 +48,12 @@ extern "C" {
|
|||
typedef struct SAppInstInfo SAppInstInfo;
|
||||
|
||||
typedef struct SHbConnInfo {
|
||||
void *param;
|
||||
SClientHbReq *req;
|
||||
void* param;
|
||||
SClientHbReq* req;
|
||||
} SHbConnInfo;
|
||||
|
||||
typedef struct SAppHbMgr {
|
||||
char *key;
|
||||
char* key;
|
||||
// statistics
|
||||
int32_t reportCnt;
|
||||
int32_t connKeyCnt;
|
||||
|
@ -64,15 +64,13 @@ typedef struct SAppHbMgr {
|
|||
// connection
|
||||
SAppInstInfo* pAppInstInfo;
|
||||
// info
|
||||
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
|
||||
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
|
||||
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
|
||||
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
|
||||
} SAppHbMgr;
|
||||
|
||||
typedef int32_t (*FHbRspHandle)(struct SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
|
||||
|
||||
typedef int32_t (*FHbRspHandle)(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp);
|
||||
|
||||
typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req);
|
||||
|
||||
typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req);
|
||||
|
||||
typedef struct SClientHbMgr {
|
||||
int8_t inited;
|
||||
|
@ -85,63 +83,62 @@ typedef struct SClientHbMgr {
|
|||
FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX];
|
||||
} SClientHbMgr;
|
||||
|
||||
|
||||
typedef struct SQueryExecMetric {
|
||||
int64_t start; // start timestamp
|
||||
int64_t parsed; // start to parse
|
||||
int64_t send; // start to send to server
|
||||
int64_t rsp; // receive response from server
|
||||
int64_t start; // start timestamp
|
||||
int64_t parsed; // start to parse
|
||||
int64_t send; // start to send to server
|
||||
int64_t rsp; // receive response from server
|
||||
} SQueryExecMetric;
|
||||
|
||||
typedef struct SInstanceSummary {
|
||||
uint64_t numOfInsertsReq;
|
||||
uint64_t numOfInsertRows;
|
||||
uint64_t insertElapsedTime;
|
||||
uint64_t insertBytes; // submit to tsdb since launched.
|
||||
uint64_t numOfInsertsReq;
|
||||
uint64_t numOfInsertRows;
|
||||
uint64_t insertElapsedTime;
|
||||
uint64_t insertBytes; // submit to tsdb since launched.
|
||||
|
||||
uint64_t fetchBytes;
|
||||
uint64_t queryElapsedTime;
|
||||
uint64_t numOfSlowQueries;
|
||||
uint64_t totalRequests;
|
||||
uint64_t currentRequests; // the number of SRequestObj
|
||||
uint64_t fetchBytes;
|
||||
uint64_t queryElapsedTime;
|
||||
uint64_t numOfSlowQueries;
|
||||
uint64_t totalRequests;
|
||||
uint64_t currentRequests; // the number of SRequestObj
|
||||
} SInstanceSummary;
|
||||
|
||||
typedef struct SHeartBeatInfo {
|
||||
void *pTimer; // timer, used to send request msg to mnode
|
||||
void* pTimer; // timer, used to send request msg to mnode
|
||||
} SHeartBeatInfo;
|
||||
|
||||
struct SAppInstInfo {
|
||||
int64_t numOfConns;
|
||||
SCorEpSet mgmtEp;
|
||||
SInstanceSummary summary;
|
||||
SList *pConnList; // STscObj linked list
|
||||
int64_t clusterId;
|
||||
void *pTransporter;
|
||||
struct SAppHbMgr *pAppHbMgr;
|
||||
int64_t numOfConns;
|
||||
SCorEpSet mgmtEp;
|
||||
SInstanceSummary summary;
|
||||
SList* pConnList; // STscObj linked list
|
||||
int64_t clusterId;
|
||||
void* pTransporter;
|
||||
struct SAppHbMgr* pAppHbMgr;
|
||||
};
|
||||
|
||||
typedef struct SAppInfo {
|
||||
int64_t startTime;
|
||||
char appName[TSDB_APP_NAME_LEN];
|
||||
char *ep;
|
||||
char* ep;
|
||||
int32_t pid;
|
||||
int32_t numOfThreads;
|
||||
SHashObj *pInstMap;
|
||||
SHashObj* pInstMap;
|
||||
pthread_mutex_t mutex;
|
||||
} SAppInfo;
|
||||
|
||||
typedef struct STscObj {
|
||||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_PASSWORD_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
char ver[128];
|
||||
int32_t acctId;
|
||||
uint32_t connId;
|
||||
int32_t connType;
|
||||
uint64_t id; // ref ID returned by taosAddRef
|
||||
pthread_mutex_t mutex; // used to protect the operation on db
|
||||
int32_t numOfReqs; // number of sqlObj bound to this connection
|
||||
SAppInstInfo *pAppInfo;
|
||||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_PASSWORD_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
char ver[128];
|
||||
int32_t acctId;
|
||||
uint32_t connId;
|
||||
int32_t connType;
|
||||
uint64_t id; // ref ID returned by taosAddRef
|
||||
pthread_mutex_t mutex; // used to protect the operation on db
|
||||
int32_t numOfReqs; // number of sqlObj bound to this connection
|
||||
SAppInstInfo* pAppInfo;
|
||||
} STscObj;
|
||||
|
||||
typedef struct SMqConsumer {
|
||||
|
@ -149,49 +146,49 @@ typedef struct SMqConsumer {
|
|||
} SMqConsumer;
|
||||
|
||||
typedef struct SReqResultInfo {
|
||||
const char *pRspMsg;
|
||||
const char *pData;
|
||||
TAOS_FIELD *fields;
|
||||
uint32_t numOfCols;
|
||||
int32_t *length;
|
||||
TAOS_ROW row;
|
||||
char **pCol;
|
||||
uint32_t numOfRows;
|
||||
uint64_t totalRows;
|
||||
uint32_t current;
|
||||
bool completed;
|
||||
const char* pRspMsg;
|
||||
const char* pData;
|
||||
TAOS_FIELD* fields;
|
||||
uint32_t numOfCols;
|
||||
int32_t* length;
|
||||
TAOS_ROW row;
|
||||
char** pCol;
|
||||
uint32_t numOfRows;
|
||||
uint64_t totalRows;
|
||||
uint32_t current;
|
||||
bool completed;
|
||||
} SReqResultInfo;
|
||||
|
||||
typedef struct SShowReqInfo {
|
||||
int64_t execId; // showId/queryId
|
||||
int32_t vgId;
|
||||
SArray *pArray; // SArray<SVgroupInfo>
|
||||
int32_t currentIndex; // current accessed vgroup index.
|
||||
int64_t execId; // showId/queryId
|
||||
int32_t vgId;
|
||||
SArray* pArray; // SArray<SVgroupInfo>
|
||||
int32_t currentIndex; // current accessed vgroup index.
|
||||
} SShowReqInfo;
|
||||
|
||||
typedef struct SRequestSendRecvBody {
|
||||
tsem_t rspSem; // not used now
|
||||
tsem_t rspSem; // not used now
|
||||
void* fp;
|
||||
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
|
||||
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
|
||||
SDataBuf requestMsg;
|
||||
struct SSchJob *pQueryJob; // query job, created according to sql query DAG.
|
||||
struct SQueryDag *pDag; // the query dag, generated according to the sql statement.
|
||||
struct SSchJob* pQueryJob; // query job, created according to sql query DAG.
|
||||
struct SQueryDag* pDag; // the query dag, generated according to the sql statement.
|
||||
SReqResultInfo resInfo;
|
||||
} SRequestSendRecvBody;
|
||||
|
||||
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
|
||||
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
|
||||
|
||||
typedef struct SRequestObj {
|
||||
uint64_t requestId;
|
||||
int32_t type; // request type
|
||||
STscObj *pTscObj;
|
||||
char *sqlstr; // sql string
|
||||
int32_t sqlLen;
|
||||
int64_t self;
|
||||
char *msgBuf;
|
||||
void *pInfo; // sql parse info, generated by parser module
|
||||
int32_t code;
|
||||
SQueryExecMetric metric;
|
||||
uint64_t requestId;
|
||||
int32_t type; // request type
|
||||
STscObj* pTscObj;
|
||||
char* sqlstr; // sql string
|
||||
int32_t sqlLen;
|
||||
int64_t self;
|
||||
char* msgBuf;
|
||||
void* pInfo; // sql parse info, generated by parser module
|
||||
int32_t code;
|
||||
SQueryExecMetric metric;
|
||||
SRequestSendRecvBody body;
|
||||
} SRequestObj;
|
||||
|
||||
|
@ -200,51 +197,52 @@ extern int32_t clientReqRefPool;
|
|||
extern int32_t clientConnRefPool;
|
||||
|
||||
extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
|
||||
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
|
||||
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
|
||||
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
|
||||
|
||||
int taos_init();
|
||||
int taos_init();
|
||||
|
||||
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo);
|
||||
void destroyTscObj(void*pObj);
|
||||
void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo);
|
||||
void destroyTscObj(void* pObj);
|
||||
|
||||
uint64_t generateRequestId();
|
||||
|
||||
void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
|
||||
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
|
||||
void destroyRequest(SRequestObj* pRequest);
|
||||
|
||||
char *getDbOfConnection(STscObj* pObj);
|
||||
char* getDbOfConnection(STscObj* pObj);
|
||||
void setConnectionDB(STscObj* pTscObj, const char* db);
|
||||
|
||||
void taos_init_imp(void);
|
||||
int taos_options_imp(TSDB_OPTION option, const char *str);
|
||||
int taos_options_imp(TSDB_OPTION option, const char* str);
|
||||
|
||||
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
|
||||
void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
|
||||
|
||||
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
|
||||
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
|
||||
|
||||
void initMsgHandleFp();
|
||||
|
||||
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
|
||||
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
uint16_t port);
|
||||
|
||||
void *doFetchRow(SRequestObj* pRequest);
|
||||
void* doFetchRow(SRequestObj* pRequest);
|
||||
|
||||
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
|
||||
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
|
||||
|
||||
|
||||
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest);
|
||||
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
||||
|
||||
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery);
|
||||
|
||||
// --- heartbeat
|
||||
// --- heartbeat
|
||||
// global, called by mgmt
|
||||
int hbMgrInit();
|
||||
void hbMgrCleanUp();
|
||||
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
|
||||
|
||||
// cluster level
|
||||
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key);
|
||||
void appHbMgrCleanup(void);
|
||||
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
|
||||
void appHbMgrCleanup(void);
|
||||
|
||||
// conn level
|
||||
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType);
|
||||
|
@ -255,6 +253,7 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
|
|||
// --- mq
|
||||
void hbMgrInitMqHbRspHandle();
|
||||
|
||||
|
||||
// config
|
||||
int32_t tscInitLog(const char *cfgDir, const char *envFile, const char *apolloUrl);
|
||||
int32_t tscInitCfg(const char *cfgDir, const char *envFile, const char *apolloUrl);
|
||||
|
|
|
@ -13,15 +13,15 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "catalog.h"
|
||||
#include "clientInt.h"
|
||||
#include "clientLog.h"
|
||||
#include "os.h"
|
||||
#include "query.h"
|
||||
#include "scheduler.h"
|
||||
#include "tmsg.h"
|
||||
#include "tcache.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmsg.h"
|
||||
#include "tnote.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
|
@ -29,16 +29,16 @@
|
|||
#include "ttimezone.h"
|
||||
|
||||
#define TSC_VAR_NOT_RELEASE 1
|
||||
#define TSC_VAR_RELEASED 0
|
||||
#define TSC_VAR_RELEASED 0
|
||||
|
||||
SAppInfo appInfo;
|
||||
int32_t clientReqRefPool = -1;
|
||||
int32_t clientConnRefPool = -1;
|
||||
SAppInfo appInfo;
|
||||
int32_t clientReqRefPool = -1;
|
||||
int32_t clientConnRefPool = -1;
|
||||
|
||||
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
|
||||
volatile int32_t tscInitRes = 0;
|
||||
volatile int32_t tscInitRes = 0;
|
||||
|
||||
static void registerRequest(SRequestObj* pRequest) {
|
||||
static void registerRequest(SRequestObj *pRequest) {
|
||||
STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id);
|
||||
assert(pTscObj != NULL);
|
||||
|
||||
|
@ -52,44 +52,49 @@ static void registerRequest(SRequestObj* pRequest) {
|
|||
|
||||
int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1);
|
||||
int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1);
|
||||
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%"PRIx64, pRequest->self,
|
||||
pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
||||
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
|
||||
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
||||
}
|
||||
}
|
||||
|
||||
static void deregisterRequest(SRequestObj* pRequest) {
|
||||
static void deregisterRequest(SRequestObj *pRequest) {
|
||||
assert(pRequest != NULL);
|
||||
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SInstanceSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
STscObj * pTscObj = pRequest->pTscObj;
|
||||
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
|
||||
int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1);
|
||||
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
|
||||
|
||||
int64_t duration = taosGetTimestampMs() - pRequest->metric.start;
|
||||
tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", reqId:0x%"PRIx64" elapsed:%"PRIu64" ms, current:%d, app current:%d", pRequest->self, pTscObj->id,
|
||||
pRequest->requestId, duration, num, currentInst);
|
||||
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
|
||||
" ms, current:%d, app current:%d",
|
||||
pRequest->self, pTscObj->id, pRequest->requestId, duration, num, currentInst);
|
||||
taosReleaseRef(clientConnRefPool, pTscObj->id);
|
||||
}
|
||||
|
||||
|
||||
|
||||
// todo close the transporter properly
|
||||
void closeTransporter(STscObj* pTscObj) {
|
||||
void closeTransporter(STscObj *pTscObj) {
|
||||
if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id);
|
||||
tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id);
|
||||
rpcClose(pTscObj->pAppInfo->pTransporter);
|
||||
}
|
||||
|
||||
// TODO refactor
|
||||
void* openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
||||
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
||||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = 0;
|
||||
rpcInit.label = "TSC";
|
||||
rpcInit.numOfThreads = numOfThread;
|
||||
rpcInit.cfp = processMsgFromServer;
|
||||
rpcInit.pfp = persistConnForSpecificMsg;
|
||||
rpcInit.sessions = cfgGetItem(tscCfg, "maxConnections")->i32;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit.user = (char *)user;
|
||||
|
@ -98,7 +103,7 @@ void* openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
|||
rpcInit.spi = 1;
|
||||
rpcInit.secret = (char *)auth;
|
||||
|
||||
void* pDnodeConn = rpcOpen(&rpcInit);
|
||||
void *pDnodeConn = rpcOpen(&rpcInit);
|
||||
if (pDnodeConn == NULL) {
|
||||
tscError("failed to init connection to server");
|
||||
return NULL;
|
||||
|
@ -113,12 +118,12 @@ void destroyTscObj(void *pObj) {
|
|||
SClientHbKey connKey = {.connId = pTscObj->connId, .hbType = pTscObj->connType};
|
||||
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
|
||||
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||
tscDebug("connObj 0x%"PRIx64" destroyed, totalConn:%"PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
|
||||
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
|
||||
pthread_mutex_destroy(&pTscObj->mutex);
|
||||
tfree(pTscObj);
|
||||
}
|
||||
|
||||
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo) {
|
||||
void *createTscObj(const char *user, const char *auth, const char *db, SAppInstInfo *pAppInfo) {
|
||||
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
|
||||
if (NULL == pObj) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
@ -136,11 +141,11 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI
|
|||
pthread_mutex_init(&pObj->mutex, NULL);
|
||||
pObj->id = taosAddRef(clientConnRefPool, pObj);
|
||||
|
||||
tscDebug("connObj created, 0x%"PRIx64, pObj->id);
|
||||
tscDebug("connObj created, 0x%" PRIx64, pObj->id);
|
||||
return pObj;
|
||||
}
|
||||
|
||||
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type) {
|
||||
void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) {
|
||||
assert(pObj != NULL);
|
||||
|
||||
SRequestObj *pRequest = (SRequestObj *)calloc(1, sizeof(SRequestObj));
|
||||
|
@ -149,20 +154,20 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pRequest->requestId = generateRequestId();
|
||||
pRequest->requestId = generateRequestId();
|
||||
pRequest->metric.start = taosGetTimestampMs();
|
||||
|
||||
pRequest->type = type;
|
||||
pRequest->pTscObj = pObj;
|
||||
pRequest->body.fp = fp; // not used it yet
|
||||
pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
|
||||
pRequest->type = type;
|
||||
pRequest->pTscObj = pObj;
|
||||
pRequest->body.fp = fp; // not used it yet
|
||||
pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
|
||||
tsem_init(&pRequest->body.rspSem, 0, 0);
|
||||
|
||||
registerRequest(pRequest);
|
||||
return pRequest;
|
||||
}
|
||||
|
||||
static void doFreeReqResultInfo(SReqResultInfo* pResInfo) {
|
||||
static void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
|
||||
tfree(pResInfo->pRspMsg);
|
||||
tfree(pResInfo->length);
|
||||
tfree(pResInfo->row);
|
||||
|
@ -170,9 +175,9 @@ static void doFreeReqResultInfo(SReqResultInfo* pResInfo) {
|
|||
tfree(pResInfo->fields);
|
||||
}
|
||||
|
||||
static void doDestroyRequest(void* p) {
|
||||
static void doDestroyRequest(void *p) {
|
||||
assert(p != NULL);
|
||||
SRequestObj* pRequest = (SRequestObj*)p;
|
||||
SRequestObj *pRequest = (SRequestObj *)p;
|
||||
|
||||
assert(RID_VALID(pRequest->self));
|
||||
|
||||
|
@ -191,7 +196,7 @@ static void doDestroyRequest(void* p) {
|
|||
tfree(pRequest);
|
||||
}
|
||||
|
||||
void destroyRequest(SRequestObj* pRequest) {
|
||||
void destroyRequest(SRequestObj *pRequest) {
|
||||
if (pRequest == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -237,14 +242,14 @@ void taos_init_imp(void) {
|
|||
initTaskQueue();
|
||||
|
||||
clientConnRefPool = taosOpenRef(200, destroyTscObj);
|
||||
clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
|
||||
clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
|
||||
|
||||
taosGetAppName(appInfo.appName, NULL);
|
||||
pthread_mutex_init(&appInfo.mutex, NULL);
|
||||
|
||||
appInfo.pid = taosGetPId();
|
||||
appInfo.pid = taosGetPId();
|
||||
appInfo.startTime = taosGetTimestampMs();
|
||||
appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
tscDebug("client is initialized successfully");
|
||||
}
|
||||
|
||||
|
@ -267,7 +272,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
|
||||
tscInfo("set config file directory:%s", str);
|
||||
} else {
|
||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
|
||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
|
||||
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -282,7 +288,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
|
||||
tscInfo("set shellActivityTimer:%d", tsShellActivityTimer);
|
||||
} else {
|
||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr);
|
||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, str,
|
||||
tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr);
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -299,8 +306,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
|
||||
char sep = '.';
|
||||
|
||||
if (strlen(tsLocale) == 0) { // locale does not set yet
|
||||
char* defaultLocale = setlocale(LC_CTYPE, "");
|
||||
if (strlen(tsLocale) == 0) { // locale does not set yet
|
||||
char *defaultLocale = setlocale(LC_CTYPE, "");
|
||||
|
||||
// The locale of the current OS does not be set correctly, so the default locale cannot be acquired.
|
||||
// The launch of current system will abort soon.
|
||||
|
@ -315,10 +322,10 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
// set the user specified locale
|
||||
char *locale = setlocale(LC_CTYPE, str);
|
||||
|
||||
if (locale != NULL) { // failed to set the user specified locale
|
||||
if (locale != NULL) { // failed to set the user specified locale
|
||||
tscInfo("locale set, prev locale:%s, new locale:%s", tsLocale, locale);
|
||||
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
|
||||
} else { // set the user specified locale failed, use default LC_CTYPE as current locale
|
||||
} else { // set the user specified locale failed, use default LC_CTYPE as current locale
|
||||
locale = setlocale(LC_CTYPE, tsLocale);
|
||||
tscInfo("failed to set locale:%s, current locale:%s", str, tsLocale);
|
||||
}
|
||||
|
@ -346,11 +353,12 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
}
|
||||
|
||||
free(charset);
|
||||
} else { // it may be windows system
|
||||
} else { // it may be windows system
|
||||
tscInfo("charset remains:%s", tsCharset);
|
||||
}
|
||||
} else {
|
||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
|
||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
|
||||
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -380,7 +388,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
tscInfo("charset:%s not valid", str);
|
||||
}
|
||||
} else {
|
||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
|
||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
|
||||
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -396,7 +405,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
|
||||
tscDebug("timezone set:%s, input:%s by taos_options", tsTimezone, str);
|
||||
} else {
|
||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
|
||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
|
||||
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -420,7 +430,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
*/
|
||||
uint64_t generateRequestId() {
|
||||
static uint64_t hashId = 0;
|
||||
static int32_t requestSerialId = 0;
|
||||
static int32_t requestSerialId = 0;
|
||||
|
||||
if (hashId == 0) {
|
||||
char uid[64] = {0};
|
||||
|
@ -434,9 +444,9 @@ uint64_t generateRequestId() {
|
|||
}
|
||||
}
|
||||
|
||||
int64_t ts = taosGetTimestampMs();
|
||||
uint64_t pid = taosGetPId();
|
||||
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
|
||||
int64_t ts = taosGetTimestampMs();
|
||||
uint64_t pid = taosGetPId();
|
||||
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
|
||||
|
||||
uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
|
||||
return id;
|
||||
|
|
|
@ -366,7 +366,6 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
|
|||
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
|
||||
pMsgSendInfo->param = pRequest;
|
||||
|
||||
|
||||
SConnectReq connectReq = {0};
|
||||
STscObj* pObj = pRequest->pTscObj;
|
||||
|
||||
|
@ -394,7 +393,9 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
|
|||
tfree(pMsgBody->msgInfo.pData);
|
||||
tfree(pMsgBody);
|
||||
}
|
||||
|
||||
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
|
||||
return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP;
|
||||
}
|
||||
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
|
||||
assert(pMsg->ahandle != NULL);
|
||||
|
|
|
@ -66,6 +66,7 @@ typedef struct {
|
|||
|
||||
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
||||
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
|
||||
bool (*pfp)(void* parent, tmsg_t msgType);
|
||||
|
||||
int32_t refCount;
|
||||
void* parent;
|
||||
|
|
|
@ -29,7 +29,12 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
if (pInit->label) {
|
||||
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1);
|
||||
}
|
||||
|
||||
// register callback handle
|
||||
pRpc->cfp = pInit->cfp;
|
||||
pRpc->afp = pInit->afp;
|
||||
pRpc->pfp = pInit->pfp;
|
||||
|
||||
if (pInit->connType == TAOS_CONN_SERVER) {
|
||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||
} else {
|
||||
|
|
|
@ -134,8 +134,7 @@ static void clientHandleResp(SCliConn* conn) {
|
|||
rpcMsg.msgType = pHead->msgType;
|
||||
rpcMsg.ahandle = pCtx->ahandle;
|
||||
|
||||
if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP ||
|
||||
rpcMsg.msgType == TDMT_VND_RES_READY_RSP) {
|
||||
if (pRpc->pfp != NULL && (pRpc->pfp)(pRpc->parent, rpcMsg.msgType)) {
|
||||
rpcMsg.handle = conn;
|
||||
conn->persist = 1;
|
||||
tDebug("client conn %p persist by app", conn);
|
||||
|
@ -185,18 +184,13 @@ static void clientHandleExcept(SCliConn* pConn) {
|
|||
clientConnDestroy(pConn, true);
|
||||
return;
|
||||
}
|
||||
SCliMsg* pMsg = pConn->data;
|
||||
|
||||
tmsg_t msgType = TDMT_MND_CONNECT;
|
||||
if (pMsg != NULL) {
|
||||
msgType = pMsg->msg.msgType;
|
||||
}
|
||||
SCliMsg* pMsg = pConn->data;
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.ahandle = pCtx->ahandle;
|
||||
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
rpcMsg.msgType = msgType + 1;
|
||||
rpcMsg.msgType = pMsg->msg.msgType + 1;
|
||||
|
||||
if (pConn->push != NULL && pConn->ctnRdCnt != 0) {
|
||||
(*pConn->push->callback)(pConn->push->arg, &rpcMsg);
|
||||
|
@ -445,7 +439,7 @@ static void clientConnCb(uv_connect_t* req, int status) {
|
|||
addrlen = sizeof(pConn->locaddr);
|
||||
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
|
||||
|
||||
tTrace("client conn %p create", pConn);
|
||||
tTrace("client conn %p connect to server successfully", pConn);
|
||||
|
||||
assert(pConn->stream == req->handle);
|
||||
clientWrite(pConn);
|
||||
|
@ -524,6 +518,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
struct sockaddr_in addr;
|
||||
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
||||
// handle error in callback if fail to connect
|
||||
tTrace("client conn %p try to connect to %s:%d", conn, pMsg->ctx->ip, pMsg->ctx->port);
|
||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
|
||||
}
|
||||
|
||||
|
|
|
@ -413,11 +413,6 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
|||
} else {
|
||||
uvStartSendResp(msg);
|
||||
}
|
||||
// uv_buf_t wb;
|
||||
// uvPrepareSendData(msg, &wb);
|
||||
// uv_timer_stop(conn->pTimer);
|
||||
|
||||
// uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
|
||||
}
|
||||
}
|
||||
static void uvAcceptAsyncCb(uv_async_t* async) {
|
||||
|
@ -490,7 +485,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
|||
pConn->pTimer->data = pConn;
|
||||
|
||||
pConn->hostThrd = pThrd;
|
||||
// pConn->pWorkerAsync = pThrd->workerAsync; // thread safty
|
||||
|
||||
// init client handle
|
||||
pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
|
||||
|
@ -730,14 +724,9 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
|||
}
|
||||
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
||||
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
|
||||
|
||||
// pthread_mutex_lock(&pThrd->msgMtx);
|
||||
// QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
|
||||
// pthread_mutex_unlock(&pThrd->msgMtx);
|
||||
tDebug("send quit msg to work thread");
|
||||
|
||||
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
||||
// uv_async_send(pThrd->workerAsync);
|
||||
}
|
||||
|
||||
void taosCloseServer(void* arg) {
|
||||
|
@ -774,19 +763,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
|
|||
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
|
||||
srvMsg->pConn = pConn;
|
||||
srvMsg->msg = *pMsg;
|
||||
|
||||
// pthread_mutex_lock(&pThrd->msgMtx);
|
||||
// QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
|
||||
// pthread_mutex_unlock(&pThrd->msgMtx);
|
||||
|
||||
tTrace("server conn %p start to send resp", pConn);
|
||||
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
||||
// uv_async_send(pThrd->workerAsync);
|
||||
}
|
||||
|
||||
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) {
|
||||
SSrvConn* pConn = thandle;
|
||||
// struct sockaddr* pPeerName = &pConn->peername;
|
||||
|
||||
struct sockaddr_in addr = pConn->addr;
|
||||
pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
|
||||
|
|
Loading…
Reference in New Issue