support multiple UDP threads
change the mutex in connection cache to home made lock
This commit is contained in:
parent
3eccf4d22f
commit
f74ad86c54
|
@ -29,8 +29,8 @@ extern "C" {
|
||||||
extern int tsRpcHeadSize;
|
extern int tsRpcHeadSize;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int16_t index;
|
int8_t inUse;
|
||||||
int16_t numOfIps;
|
int8_t numOfIps;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
uint32_t ip[TSDB_MAX_MPEERS];
|
uint32_t ip[TSDB_MAX_MPEERS];
|
||||||
} SRpcIpSet;
|
} SRpcIpSet;
|
||||||
|
@ -43,13 +43,13 @@ typedef struct {
|
||||||
} SRpcConnInfo;
|
} SRpcConnInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char *localIp; // local IP used
|
char *localIp; // local IP used
|
||||||
uint16_t localPort; // local port
|
uint16_t localPort; // local port
|
||||||
char *label; // for debug purpose
|
char *label; // for debug purpose
|
||||||
int numOfThreads; // number of threads to handle connections
|
int numOfThreads; // number of threads to handle connections
|
||||||
int sessions; // number of sessions allowed
|
int sessions; // number of sessions allowed
|
||||||
int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
|
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
|
||||||
int idleTime; // milliseconds, 0 means idle timer is disabled
|
int idleTime; // milliseconds, 0 means idle timer is disabled
|
||||||
|
|
||||||
// the following is for client app ecurity only
|
// the following is for client app ecurity only
|
||||||
char *user; // user name
|
char *user; // user name
|
||||||
|
@ -72,6 +72,7 @@ void *rpcOpen(SRpcInit *pRpc);
|
||||||
void rpcClose(void *);
|
void rpcClose(void *);
|
||||||
void *rpcMallocCont(int contLen);
|
void *rpcMallocCont(int contLen);
|
||||||
void rpcFreeCont(void *pCont);
|
void rpcFreeCont(void *pCont);
|
||||||
|
void *rpcReallocCont(void *ptr, int contLen);
|
||||||
void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle);
|
void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle);
|
||||||
void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen);
|
void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen);
|
||||||
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
|
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
|
||||||
|
|
|
@ -22,8 +22,8 @@ extern "C" {
|
||||||
|
|
||||||
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
|
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
|
||||||
void rpcCloseConnCache(void *handle);
|
void rpcCloseConnCache(void *handle);
|
||||||
void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user);
|
void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, int8_t connType);
|
||||||
void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user);
|
void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connType);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,6 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include "tglobalcfg.h"
|
#include "tglobalcfg.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tmempool.h"
|
#include "tmempool.h"
|
||||||
|
@ -26,6 +25,7 @@
|
||||||
typedef struct _c_hash_t {
|
typedef struct _c_hash_t {
|
||||||
uint32_t ip;
|
uint32_t ip;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
|
char connType;
|
||||||
struct _c_hash_t *prev;
|
struct _c_hash_t *prev;
|
||||||
struct _c_hash_t *next;
|
struct _c_hash_t *next;
|
||||||
void * data;
|
void * data;
|
||||||
|
@ -43,160 +43,14 @@ typedef struct {
|
||||||
void (*cleanFp)(void *);
|
void (*cleanFp)(void *);
|
||||||
void *tmrCtrl;
|
void *tmrCtrl;
|
||||||
void *pTimer;
|
void *pTimer;
|
||||||
|
int64_t *lockedBy;
|
||||||
} SConnCache;
|
} SConnCache;
|
||||||
|
|
||||||
int rpcHashConn(void *handle, uint32_t ip, uint16_t port, char *user) {
|
static int rpcHashConn(void *handle, uint32_t ip, uint16_t port, int8_t connType);
|
||||||
SConnCache *pCache = (SConnCache *)handle;
|
static void rpcLockCache(int64_t *lockedBy);
|
||||||
int hash = 0;
|
static void rpcUnlockCache(int64_t *lockedBy);
|
||||||
// size_t user_len = strlen(user);
|
static void rpcCleanConnCache(void *handle, void *tmrId);
|
||||||
|
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time);
|
||||||
hash = ip >> 16;
|
|
||||||
hash += (unsigned short)(ip & 0xFFFF);
|
|
||||||
hash += port;
|
|
||||||
while (*user != '\0') {
|
|
||||||
hash += *user;
|
|
||||||
user++;
|
|
||||||
}
|
|
||||||
|
|
||||||
hash = hash % pCache->maxSessions;
|
|
||||||
|
|
||||||
return hash;
|
|
||||||
}
|
|
||||||
|
|
||||||
void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
|
|
||||||
if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return;
|
|
||||||
|
|
||||||
SConnHash *pPrev = pNode->prev, *pNext;
|
|
||||||
|
|
||||||
while (pNode) {
|
|
||||||
(*pCache->cleanFp)(pNode->data);
|
|
||||||
pNext = pNode->next;
|
|
||||||
pCache->total--;
|
|
||||||
pCache->count[hash]--;
|
|
||||||
tTrace("%p ip:0x%x:%hu:%d:%p removed from cache, connections:%d", pNode->data, pNode->ip, pNode->port, hash, pNode,
|
|
||||||
pCache->count[hash]);
|
|
||||||
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
|
|
||||||
pNode = pNext;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pPrev)
|
|
||||||
pPrev->next = NULL;
|
|
||||||
else
|
|
||||||
pCache->connHashList[hash] = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) {
|
|
||||||
int hash;
|
|
||||||
SConnHash * pNode;
|
|
||||||
SConnCache *pCache;
|
|
||||||
|
|
||||||
uint64_t time = taosGetTimestampMs();
|
|
||||||
|
|
||||||
pCache = (SConnCache *)handle;
|
|
||||||
assert(pCache);
|
|
||||||
assert(data);
|
|
||||||
|
|
||||||
hash = rpcHashConn(pCache, ip, port, user);
|
|
||||||
pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
|
|
||||||
pNode->ip = ip;
|
|
||||||
pNode->port = port;
|
|
||||||
pNode->data = data;
|
|
||||||
pNode->prev = NULL;
|
|
||||||
pNode->time = time;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pCache->mutex);
|
|
||||||
|
|
||||||
pNode->next = pCache->connHashList[hash];
|
|
||||||
if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode;
|
|
||||||
pCache->connHashList[hash] = pNode;
|
|
||||||
|
|
||||||
pCache->total++;
|
|
||||||
pCache->count[hash]++;
|
|
||||||
rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pCache->mutex);
|
|
||||||
|
|
||||||
tTrace("%p ip:0x%x:%hu:%d:%p added into cache, connections:%d", data, ip, port, hash, pNode, pCache->count[hash]);
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
void rpcCleanConnCache(void *handle, void *tmrId) {
|
|
||||||
int hash;
|
|
||||||
SConnHash * pNode;
|
|
||||||
SConnCache *pCache;
|
|
||||||
|
|
||||||
pCache = (SConnCache *)handle;
|
|
||||||
if (pCache == NULL || pCache->maxSessions == 0) return;
|
|
||||||
if (pCache->pTimer != tmrId) return;
|
|
||||||
|
|
||||||
uint64_t time = taosGetTimestampMs();
|
|
||||||
|
|
||||||
for (hash = 0; hash < pCache->maxSessions; ++hash) {
|
|
||||||
pthread_mutex_lock(&pCache->mutex);
|
|
||||||
pNode = pCache->connHashList[hash];
|
|
||||||
rpcRemoveExpiredNodes(pCache, pNode, hash, time);
|
|
||||||
pthread_mutex_unlock(&pCache->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
// tTrace("timer, total connections in cache:%d", pCache->total);
|
|
||||||
taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer);
|
|
||||||
}
|
|
||||||
|
|
||||||
void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) {
|
|
||||||
int hash;
|
|
||||||
SConnHash * pNode;
|
|
||||||
SConnCache *pCache;
|
|
||||||
void * pData = NULL;
|
|
||||||
|
|
||||||
pCache = (SConnCache *)handle;
|
|
||||||
assert(pCache);
|
|
||||||
|
|
||||||
uint64_t time = taosGetTimestampMs();
|
|
||||||
|
|
||||||
hash = rpcHashConn(pCache, ip, port, user);
|
|
||||||
pthread_mutex_lock(&pCache->mutex);
|
|
||||||
|
|
||||||
pNode = pCache->connHashList[hash];
|
|
||||||
while (pNode) {
|
|
||||||
if (time >= pCache->keepTimer + pNode->time) {
|
|
||||||
rpcRemoveExpiredNodes(pCache, pNode, hash, time);
|
|
||||||
pNode = NULL;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pNode->ip == ip && pNode->port == port) break;
|
|
||||||
|
|
||||||
pNode = pNode->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pNode) {
|
|
||||||
rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
|
|
||||||
|
|
||||||
if (pNode->prev) {
|
|
||||||
pNode->prev->next = pNode->next;
|
|
||||||
} else {
|
|
||||||
pCache->connHashList[hash] = pNode->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pNode->next) {
|
|
||||||
pNode->next->prev = pNode->prev;
|
|
||||||
}
|
|
||||||
|
|
||||||
pData = pNode->data;
|
|
||||||
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
|
|
||||||
pCache->total--;
|
|
||||||
pCache->count[hash]--;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pCache->mutex);
|
|
||||||
|
|
||||||
if (pData) {
|
|
||||||
tTrace("%p ip:0x%x:%hu:%d:%p retrieved from cache, connections:%d", pData, ip, port, hash, pNode, pCache->count[hash]);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pData;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
|
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
|
||||||
SConnHash **connHashList;
|
SConnHash **connHashList;
|
||||||
|
@ -228,6 +82,7 @@ void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl,
|
||||||
pCache->connHashList = connHashList;
|
pCache->connHashList = connHashList;
|
||||||
pCache->cleanFp = cleanFp;
|
pCache->cleanFp = cleanFp;
|
||||||
pCache->tmrCtrl = tmrCtrl;
|
pCache->tmrCtrl = tmrCtrl;
|
||||||
|
pCache->lockedBy = calloc(sizeof(int64_t), maxSessions);
|
||||||
taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer);
|
taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer);
|
||||||
|
|
||||||
pthread_mutex_init(&pCache->mutex, NULL);
|
pthread_mutex_init(&pCache->mutex, NULL);
|
||||||
|
@ -250,10 +105,179 @@ void rpcCloseConnCache(void *handle) {
|
||||||
tfree(pCache->connHashList);
|
tfree(pCache->connHashList);
|
||||||
tfree(pCache->count)
|
tfree(pCache->count)
|
||||||
|
|
||||||
pthread_mutex_unlock(&pCache->mutex);
|
pthread_mutex_unlock(&pCache->mutex);
|
||||||
|
|
||||||
pthread_mutex_destroy(&pCache->mutex);
|
pthread_mutex_destroy(&pCache->mutex);
|
||||||
|
|
||||||
memset(pCache, 0, sizeof(SConnCache));
|
memset(pCache, 0, sizeof(SConnCache));
|
||||||
free(pCache);
|
free(pCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, int8_t connType) {
|
||||||
|
int hash;
|
||||||
|
SConnHash * pNode;
|
||||||
|
SConnCache *pCache;
|
||||||
|
|
||||||
|
uint64_t time = taosGetTimestampMs();
|
||||||
|
|
||||||
|
pCache = (SConnCache *)handle;
|
||||||
|
assert(pCache);
|
||||||
|
assert(data);
|
||||||
|
|
||||||
|
hash = rpcHashConn(pCache, ip, port, connType);
|
||||||
|
pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
|
||||||
|
pNode->ip = ip;
|
||||||
|
pNode->port = port;
|
||||||
|
pNode->connType = connType;
|
||||||
|
pNode->data = data;
|
||||||
|
pNode->prev = NULL;
|
||||||
|
pNode->time = time;
|
||||||
|
|
||||||
|
rpcLockCache(pCache->lockedBy+hash);
|
||||||
|
|
||||||
|
pNode->next = pCache->connHashList[hash];
|
||||||
|
if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode;
|
||||||
|
pCache->connHashList[hash] = pNode;
|
||||||
|
|
||||||
|
pCache->count[hash]++;
|
||||||
|
rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
|
||||||
|
|
||||||
|
rpcUnlockCache(pCache->lockedBy+hash);
|
||||||
|
|
||||||
|
pCache->total++;
|
||||||
|
|
||||||
|
tTrace("%p ip:0x%x:%hu:%d:%d:%p added into cache, connections:%d", data, ip, port, connType, hash, pNode, pCache->count[hash]);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connType) {
|
||||||
|
int hash;
|
||||||
|
SConnHash * pNode;
|
||||||
|
SConnCache *pCache;
|
||||||
|
void * pData = NULL;
|
||||||
|
|
||||||
|
pCache = (SConnCache *)handle;
|
||||||
|
assert(pCache);
|
||||||
|
|
||||||
|
uint64_t time = taosGetTimestampMs();
|
||||||
|
|
||||||
|
hash = rpcHashConn(pCache, ip, port, connType);
|
||||||
|
rpcLockCache(pCache->lockedBy+hash);
|
||||||
|
|
||||||
|
pNode = pCache->connHashList[hash];
|
||||||
|
while (pNode) {
|
||||||
|
if (time >= pCache->keepTimer + pNode->time) {
|
||||||
|
rpcRemoveExpiredNodes(pCache, pNode, hash, time);
|
||||||
|
pNode = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pNode->ip == ip && pNode->port == port && pNode->connType == connType) break;
|
||||||
|
|
||||||
|
pNode = pNode->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pNode) {
|
||||||
|
rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
|
||||||
|
|
||||||
|
if (pNode->prev) {
|
||||||
|
pNode->prev->next = pNode->next;
|
||||||
|
} else {
|
||||||
|
pCache->connHashList[hash] = pNode->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pNode->next) {
|
||||||
|
pNode->next->prev = pNode->prev;
|
||||||
|
}
|
||||||
|
|
||||||
|
pData = pNode->data;
|
||||||
|
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
|
||||||
|
pCache->total--;
|
||||||
|
pCache->count[hash]--;
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcUnlockCache(pCache->lockedBy+hash);
|
||||||
|
|
||||||
|
if (pData) {
|
||||||
|
tTrace("%p ip:0x%x:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, ip, port, connType, hash, pNode, pCache->count[hash]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pData;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void rpcCleanConnCache(void *handle, void *tmrId) {
|
||||||
|
int hash;
|
||||||
|
SConnHash * pNode;
|
||||||
|
SConnCache *pCache;
|
||||||
|
|
||||||
|
pCache = (SConnCache *)handle;
|
||||||
|
if (pCache == NULL || pCache->maxSessions == 0) return;
|
||||||
|
if (pCache->pTimer != tmrId) return;
|
||||||
|
|
||||||
|
uint64_t time = taosGetTimestampMs();
|
||||||
|
|
||||||
|
for (hash = 0; hash < pCache->maxSessions; ++hash) {
|
||||||
|
rpcLockCache(pCache->lockedBy+hash);
|
||||||
|
pNode = pCache->connHashList[hash];
|
||||||
|
rpcRemoveExpiredNodes(pCache, pNode, hash, time);
|
||||||
|
rpcUnlockCache(pCache->lockedBy+hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
// tTrace("timer, total connections in cache:%d", pCache->total);
|
||||||
|
taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
|
||||||
|
if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return;
|
||||||
|
|
||||||
|
SConnHash *pPrev = pNode->prev, *pNext;
|
||||||
|
|
||||||
|
while (pNode) {
|
||||||
|
(*pCache->cleanFp)(pNode->data);
|
||||||
|
pNext = pNode->next;
|
||||||
|
pCache->total--;
|
||||||
|
pCache->count[hash]--;
|
||||||
|
tTrace("%p ip:0x%x:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->ip, pNode->port, pNode->connType, hash, pNode,
|
||||||
|
pCache->count[hash]);
|
||||||
|
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
|
||||||
|
pNode = pNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pPrev)
|
||||||
|
pPrev->next = NULL;
|
||||||
|
else
|
||||||
|
pCache->connHashList[hash] = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int rpcHashConn(void *handle, uint32_t ip, uint16_t port, int8_t connType) {
|
||||||
|
SConnCache *pCache = (SConnCache *)handle;
|
||||||
|
int hash = 0;
|
||||||
|
|
||||||
|
hash = ip >> 16;
|
||||||
|
hash += (unsigned short)(ip & 0xFFFF);
|
||||||
|
hash += port;
|
||||||
|
hash += connType;
|
||||||
|
|
||||||
|
hash = hash % pCache->maxSessions;
|
||||||
|
|
||||||
|
return hash;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void rpcLockCache(int64_t *lockedBy) {
|
||||||
|
int64_t tid = taosGetPthreadId();
|
||||||
|
int i = 0;
|
||||||
|
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
|
||||||
|
if (++i % 100 == 0) {
|
||||||
|
sched_yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void rpcUnlockCache(int64_t *lockedBy) {
|
||||||
|
int64_t tid = taosGetPthreadId();
|
||||||
|
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
|
||||||
|
assert(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,12 +41,13 @@
|
||||||
#define rpcIsReq(type) (type & 1U)
|
#define rpcIsReq(type) (type & 1U)
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int sessions;
|
int sessions; // number of sessions allowed
|
||||||
int numOfThreads;
|
int numOfThreads; // number of threads to process incoming messages
|
||||||
int idleTime; // milliseconds;
|
int idleTime; // milliseconds;
|
||||||
char localIp[TSDB_IPv4ADDR_LEN];
|
char localIp[TSDB_IPv4ADDR_LEN];
|
||||||
uint16_t localPort;
|
uint16_t localPort;
|
||||||
int connType;
|
int8_t connType;
|
||||||
|
int index; // for UDP server only, round robin for multiple threads
|
||||||
char label[12];
|
char label[12];
|
||||||
|
|
||||||
char user[TSDB_UNI_LEN]; // meter ID
|
char user[TSDB_UNI_LEN]; // meter ID
|
||||||
|
@ -78,7 +79,7 @@ typedef struct {
|
||||||
int32_t contLen; // content length
|
int32_t contLen; // content length
|
||||||
int32_t code; // error code
|
int32_t code; // error code
|
||||||
int16_t numOfTry; // number of try for different servers
|
int16_t numOfTry; // number of try for different servers
|
||||||
int8_t oldIndex; // server IP index passed by app
|
int8_t oldInUse; // server IP inUse passed by app
|
||||||
int8_t redirect; // flag to indicate redirect
|
int8_t redirect; // flag to indicate redirect
|
||||||
int8_t connType; // connection type
|
int8_t connType; // connection type
|
||||||
char msg[0]; // RpcHead starts from here
|
char msg[0]; // RpcHead starts from here
|
||||||
|
@ -115,7 +116,7 @@ typedef struct _RpcConn {
|
||||||
char *pReqMsg; // request message including header
|
char *pReqMsg; // request message including header
|
||||||
int reqMsgLen; // request message length
|
int reqMsgLen; // request message length
|
||||||
SRpcInfo *pRpc; // the associated SRpcInfo
|
SRpcInfo *pRpc; // the associated SRpcInfo
|
||||||
int connType; // connection type
|
int8_t connType; // connection type
|
||||||
int64_t lockedBy; // lock for connection
|
int64_t lockedBy; // lock for connection
|
||||||
SRpcReqContext *pContext; // request context
|
SRpcReqContext *pContext; // request context
|
||||||
} SRpcConn;
|
} SRpcConn;
|
||||||
|
@ -172,8 +173,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort,
|
||||||
static void rpcCloseConn(void *thandle);
|
static void rpcCloseConn(void *thandle);
|
||||||
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext);
|
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext);
|
||||||
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
|
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
|
||||||
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr);
|
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
|
||||||
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr);
|
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
|
||||||
|
|
||||||
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
|
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
|
||||||
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
|
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
|
||||||
|
@ -207,8 +208,7 @@ void *rpcOpen(SRpcInit *pInit) {
|
||||||
if(pInit->label) strcpy(pRpc->label, pInit->label);
|
if(pInit->label) strcpy(pRpc->label, pInit->label);
|
||||||
pRpc->connType = pInit->connType;
|
pRpc->connType = pInit->connType;
|
||||||
pRpc->idleTime = pInit->idleTime;
|
pRpc->idleTime = pInit->idleTime;
|
||||||
// pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
|
pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
|
||||||
pRpc->numOfThreads = 1;
|
|
||||||
if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp);
|
if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp);
|
||||||
pRpc->localPort = pInit->localPort;
|
pRpc->localPort = pInit->localPort;
|
||||||
pRpc->afp = pInit->afp;
|
pRpc->afp = pInit->afp;
|
||||||
|
@ -331,7 +331,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in
|
||||||
pContext->contLen = contLen;
|
pContext->contLen = contLen;
|
||||||
pContext->pCont = pCont;
|
pContext->pCont = pCont;
|
||||||
pContext->msgType = type;
|
pContext->msgType = type;
|
||||||
pContext->oldIndex = pIpSet->index;
|
pContext->oldInUse = pIpSet->inUse;
|
||||||
|
|
||||||
pContext->connType = RPC_CONN_UDPC;
|
pContext->connType = RPC_CONN_UDPC;
|
||||||
if (contLen > 16000) pContext->connType = RPC_CONN_TCPC;
|
if (contLen > 16000) pContext->connType = RPC_CONN_TCPC;
|
||||||
|
@ -381,6 +381,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
|
||||||
pHead->sourceId = pConn->ownId;
|
pHead->sourceId = pConn->ownId;
|
||||||
pHead->destId = pConn->peerId;
|
pHead->destId = pConn->peerId;
|
||||||
pHead->uid = 0;
|
pHead->uid = 0;
|
||||||
|
pHead->port = htons(pConn->localPort);
|
||||||
pHead->code = htonl(code);
|
pHead->code = htonl(code);
|
||||||
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
|
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
|
||||||
|
|
||||||
|
@ -514,8 +515,12 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr) {
|
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
SRpcConn *pConn = NULL;
|
SRpcConn *pConn = NULL;
|
||||||
|
char hashstr[40];
|
||||||
|
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
|
||||||
|
|
||||||
|
sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->uid, pHead->sourceId, pRecv->connType);
|
||||||
|
|
||||||
// check if it is already allocated
|
// check if it is already allocated
|
||||||
SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
|
SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
|
||||||
|
@ -529,12 +534,12 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr
|
||||||
} else {
|
} else {
|
||||||
pConn = pRpc->connList + sid;
|
pConn = pRpc->connList + sid;
|
||||||
memset(pConn, 0, sizeof(SRpcConn));
|
memset(pConn, 0, sizeof(SRpcConn));
|
||||||
memcpy(pConn->user, user, tListLen(pConn->user));
|
memcpy(pConn->user, pHead->user, tListLen(pConn->user));
|
||||||
pConn->pRpc = pRpc;
|
pConn->pRpc = pRpc;
|
||||||
pConn->sid = sid;
|
pConn->sid = sid;
|
||||||
pConn->tranId = (uint16_t)(rand() & 0xFFFF);
|
pConn->tranId = (uint16_t)(rand() & 0xFFFF);
|
||||||
pConn->ownId = htonl(pConn->sid);
|
pConn->ownId = htonl(pConn->sid);
|
||||||
if (pRpc->afp && (*pRpc->afp)(user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) {
|
if (pRpc->afp && (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) {
|
||||||
tWarn("%s %p, user not there", pRpc->label, pConn);
|
tWarn("%s %p, user not there", pRpc->label, pConn);
|
||||||
taosFreeId(pRpc->idPool, sid); // sid shall be released
|
taosFreeId(pRpc->idPool, sid); // sid shall be released
|
||||||
terrno = TSDB_CODE_INVALID_USER;
|
terrno = TSDB_CODE_INVALID_USER;
|
||||||
|
@ -543,25 +548,33 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pConn) {
|
if (pConn) {
|
||||||
|
if (pRecv->connType == RPC_CONN_UDPS && pRpc->numOfThreads > 1) {
|
||||||
|
// UDP server, assign to new connection
|
||||||
|
pRpc->index = (pRpc->index+1) % pRpc->numOfThreads;
|
||||||
|
pConn->localPort = (pRpc->localPort + pRpc->index);
|
||||||
|
}
|
||||||
|
|
||||||
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
|
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
|
||||||
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->user);
|
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
|
||||||
|
pRpc->label, pConn, sid, pConn->user, pConn->localPort);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr) {
|
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
|
||||||
SRpcConn *pConn = NULL;
|
SRpcConn *pConn = NULL;
|
||||||
|
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
|
||||||
|
|
||||||
if (sid) {
|
if (sid) {
|
||||||
pConn = pRpc->connList + sid;
|
pConn = pRpc->connList + sid;
|
||||||
} else {
|
} else {
|
||||||
pConn = rpcAllocateServerConn(pRpc, user, hashstr);
|
pConn = rpcAllocateServerConn(pRpc, pRecv);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pConn) {
|
if (pConn) {
|
||||||
if (memcmp(pConn->user, user, tListLen(pConn->user)) != 0) {
|
if (memcmp(pConn->user, pHead->user, tListLen(pConn->user)) != 0) {
|
||||||
tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, user);
|
tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, pHead->user);
|
||||||
terrno = TSDB_CODE_MISMATCHED_METER_ID;
|
terrno = TSDB_CODE_MISMATCHED_METER_ID;
|
||||||
pConn = NULL;
|
pConn = NULL;
|
||||||
}
|
}
|
||||||
|
@ -575,13 +588,15 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
|
||||||
SRpcInfo *pRpc = pContext->pRpc;
|
SRpcInfo *pRpc = pContext->pRpc;
|
||||||
SRpcIpSet *pIpSet = &pContext->ipSet;
|
SRpcIpSet *pIpSet = &pContext->ipSet;
|
||||||
|
|
||||||
pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->ip[pIpSet->index], pIpSet->port, pRpc->user);
|
pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->ip[pIpSet->inUse], pIpSet->port, pContext->connType);
|
||||||
if ( pConn == NULL ) {
|
if ( pConn == NULL ) {
|
||||||
char ipstr[20] = {0};
|
char ipstr[20] = {0};
|
||||||
tinet_ntoa(ipstr, pIpSet->ip[pIpSet->index]);
|
tinet_ntoa(ipstr, pIpSet->ip[pIpSet->inUse]);
|
||||||
pConn = rpcOpenConn(pRpc, ipstr, pIpSet->port, pContext->connType);
|
pConn = rpcOpenConn(pRpc, ipstr, pIpSet->port, pContext->connType);
|
||||||
if (pConn) pConn->destIp = pIpSet->ip[pIpSet->index];
|
if (pConn) pConn->destIp = pIpSet->ip[pIpSet->inUse];
|
||||||
}
|
} else {
|
||||||
|
tTrace("%s %p, connection is retrieved from cache", pRpc->label, pConn);
|
||||||
|
}
|
||||||
|
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
@ -670,16 +685,16 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
int32_t sid;
|
int32_t sid;
|
||||||
SRpcConn *pConn = NULL;
|
SRpcConn *pConn = NULL;
|
||||||
char hashstr[40] = {0};
|
|
||||||
|
|
||||||
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
|
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
|
||||||
|
|
||||||
sid = htonl(pHead->destId);
|
sid = htonl(pHead->destId);
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
|
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
|
||||||
|
pHead->port = htons(pHead->port);
|
||||||
|
|
||||||
if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
|
if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
|
||||||
tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
|
tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
|
||||||
|
@ -698,8 +713,7 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
terrno = TSDB_CODE_INVALID_SESSION_ID; return NULL;
|
terrno = TSDB_CODE_INVALID_SESSION_ID; return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sid == 0) sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->uid, pHead->sourceId, pRecv->connType);
|
pConn = rpcGetConnObj(pRpc, sid, pRecv);
|
||||||
pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr);
|
|
||||||
if (pConn == NULL) return NULL;
|
if (pConn == NULL) return NULL;
|
||||||
|
|
||||||
rpcLockConn(pConn);
|
rpcLockConn(pConn);
|
||||||
|
@ -714,7 +728,7 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRecv->port) pConn->peerPort = pRecv->port;
|
if (pRecv->port) pConn->peerPort = pRecv->port;
|
||||||
if (pHead->port) pConn->peerPort = pHead->port;
|
if (pHead->port) pConn->peerPort = pHead->port;
|
||||||
if (pHead->uid) pConn->peerUid = pHead->uid;
|
if (pHead->uid) pConn->peerUid = pHead->uid;
|
||||||
|
|
||||||
terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
|
terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
|
||||||
|
@ -755,12 +769,11 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
|
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
|
||||||
SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
|
SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
|
||||||
SRpcConn *pConn = (SRpcConn *)pRecv->thandle;
|
SRpcConn *pConn = (SRpcConn *)pRecv->thandle;
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
tDump(pRecv->msg, pRecv->msgLen);
|
tDump(pRecv->msg, pRecv->msgLen);
|
||||||
|
|
||||||
// underlying UDP layer does not know it is server or client
|
// underlying UDP layer does not know it is server or client
|
||||||
pRecv->connType = pRecv->connType | pRpc->connType;
|
pRecv->connType = pRecv->connType | pRpc->connType;
|
||||||
|
|
||||||
if (pRecv->ip==0 && pConn) {
|
if (pRecv->ip==0 && pConn) {
|
||||||
rpcProcessBrokenLink(pConn);
|
rpcProcessBrokenLink(pConn);
|
||||||
|
@ -768,30 +781,31 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pConn = rpcProcessHead(pRpc, pRecv);
|
terrno = 0;
|
||||||
|
pConn = rpcProcessMsgHead(pRpc, pRecv);
|
||||||
|
|
||||||
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
|
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
|
||||||
tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d",
|
tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d port:%hu",
|
||||||
pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, code,
|
pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno,
|
||||||
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
|
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pConn && pRpc->idleTime) {
|
if (pConn && pRpc->idleTime) {
|
||||||
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_ALREADY_PROCESSED) {
|
if (terrno != TSDB_CODE_ALREADY_PROCESSED) {
|
||||||
if (code != 0) { // parsing error
|
if (terrno != 0) { // parsing error
|
||||||
if ( rpcIsReq(pHead->msgType) ) {
|
if ( rpcIsReq(pHead->msgType) ) {
|
||||||
rpcSendErrorMsgToPeer(pRecv, code);
|
rpcSendErrorMsgToPeer(pRecv, terrno);
|
||||||
tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code);
|
tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], terrno);
|
||||||
}
|
}
|
||||||
} else { // parsing OK
|
} else { // parsing OK
|
||||||
rpcProcessIncomingMsg(pConn, pHead);
|
rpcProcessIncomingMsg(pConn, pHead);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( code != 0 ) free (pRecv->msg);
|
if ( terrno ) free (pRecv->msg);
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -811,7 +825,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
int32_t code = pHead->code;
|
int32_t code = pHead->code;
|
||||||
SRpcReqContext *pContext = pConn->pContext;
|
SRpcReqContext *pContext = pConn->pContext;
|
||||||
pConn->pContext = NULL;
|
pConn->pContext = NULL;
|
||||||
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->user);
|
// for UDP, port may be changed by server, the port in ipSet shall be used for cache
|
||||||
|
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pContext->ipSet.port, pConn->connType);
|
||||||
|
|
||||||
if (code == TSDB_CODE_REDIRECT) {
|
if (code == TSDB_CODE_REDIRECT) {
|
||||||
pContext->redirect = 1;
|
pContext->redirect = 1;
|
||||||
|
@ -820,7 +835,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps);
|
tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps);
|
||||||
rpcSendReqToServer(pRpc, pContext);
|
rpcSendReqToServer(pRpc, pContext);
|
||||||
} else {
|
} else {
|
||||||
if ( pRpc->ufp && (pContext->ipSet.index != pContext->oldIndex || pContext->redirect) )
|
if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) )
|
||||||
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
|
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
|
||||||
(*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code);
|
(*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code);
|
||||||
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
|
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
|
||||||
|
@ -968,8 +983,8 @@ static void rpcProcessConnError(void *param, void *id) {
|
||||||
(*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code);
|
(*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code);
|
||||||
} else {
|
} else {
|
||||||
// move to next IP
|
// move to next IP
|
||||||
pContext->ipSet.index++;
|
pContext->ipSet.inUse++;
|
||||||
pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps;
|
pContext->ipSet.inUse = pContext->ipSet.inUse % pContext->ipSet.numOfIps;
|
||||||
rpcSendReqToServer(pRpc, pContext);
|
rpcSendReqToServer(pRpc, pContext);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ void processResponse(char type, void *pCont, int contLen, void *ahandle, int32_t
|
||||||
void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
|
void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
|
||||||
SInfo *pInfo = (SInfo *)handle;
|
SInfo *pInfo = (SInfo *)handle;
|
||||||
|
|
||||||
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->index);
|
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
|
||||||
pInfo->ipSet = *pIpSet;
|
pInfo->ipSet = *pIpSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +92,7 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
// server info
|
// server info
|
||||||
ipSet.numOfIps = 1;
|
ipSet.numOfIps = 1;
|
||||||
ipSet.index = 0;
|
ipSet.inUse = 0;
|
||||||
ipSet.port = 7000;
|
ipSet.port = 7000;
|
||||||
ipSet.ip[0] = inet_addr(serverIp);
|
ipSet.ip[0] = inet_addr(serverIp);
|
||||||
ipSet.ip[1] = inet_addr("192.168.0.1");
|
ipSet.ip[1] = inet_addr("192.168.0.1");
|
||||||
|
@ -189,7 +189,7 @@ int main(int argc, char *argv[]) {
|
||||||
float usedTime = (endTime - startTime)/1000.0; // mseconds
|
float usedTime = (endTime - startTime)/1000.0; // mseconds
|
||||||
|
|
||||||
tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
|
tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
|
||||||
tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000*numOfReqs*appThreads/usedTime, msgSize);
|
tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
|
||||||
|
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
|
|
||||||
|
|
|
@ -76,7 +76,7 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.localPort = atoi(argv[++i]);
|
rpcInit.localPort = atoi(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-i")==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-i")==0 && i < argc-1) {
|
||||||
strcpy(rpcInit.localIp, argv[++i]);
|
strcpy(rpcInit.localIp, argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||||
rpcInit.numOfThreads = atoi(argv[++i]);
|
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
||||||
msgSize = atoi(argv[++i]);
|
msgSize = atoi(argv[++i]);
|
||||||
|
@ -92,7 +92,7 @@ int main(int argc, char *argv[]) {
|
||||||
printf("\nusage: %s [options] \n", argv[0]);
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
|
printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
|
||||||
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
|
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
|
||||||
printf(" [-t threads]: number of threads, default is:%d\n", rpcInit.numOfThreads);
|
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
|
||||||
printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
|
printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
|
||||||
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
||||||
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
||||||
|
@ -103,6 +103,7 @@ int main(int argc, char *argv[]) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsAsyncLog = 0;
|
||||||
rpcInit.connType = TAOS_CONN_SERVER;
|
rpcInit.connType = TAOS_CONN_SERVER;
|
||||||
|
|
||||||
taosInitLog("server.log", 100000, 10);
|
taosInitLog("server.log", 100000, 10);
|
||||||
|
|
Loading…
Reference in New Issue