[tbase-933]
This commit is contained in:
parent
a48c13aaf1
commit
fbe9b5f852
|
@ -3652,7 +3652,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
|
||||||
*/
|
*/
|
||||||
if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) {
|
if (pMeterMetaInfo->pMeterMeta == NULL || !tscQueryOnMetric(pCmd)) {
|
||||||
if (pMeterMetaInfo->pMeterMeta) {
|
if (pMeterMetaInfo->pMeterMeta) {
|
||||||
tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%d, addr:%p", pSql,
|
tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%lld, addr:%p", pSql,
|
||||||
pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta);
|
pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta);
|
||||||
}
|
}
|
||||||
tscWaitingForCreateTable(&pSql->cmd);
|
tscWaitingForCreateTable(&pSql->cmd);
|
||||||
|
|
|
@ -224,7 +224,7 @@ typedef struct {
|
||||||
char meterId[TSDB_UNI_LEN];
|
char meterId[TSDB_UNI_LEN];
|
||||||
uint16_t port; // for UDP only
|
uint16_t port; // for UDP only
|
||||||
char empty[1];
|
char empty[1];
|
||||||
char msgType;
|
uint8_t msgType;
|
||||||
int32_t msgLen;
|
int32_t msgLen;
|
||||||
uint8_t content[0];
|
uint8_t content[0];
|
||||||
} STaosHeader;
|
} STaosHeader;
|
||||||
|
|
|
@ -256,6 +256,8 @@ SGlobalConfig *tsGetConfigOption(const char *option);
|
||||||
#define TSDB_CFG_OPTION_LEN 24
|
#define TSDB_CFG_OPTION_LEN 24
|
||||||
#define TSDB_CFG_VALUE_LEN 41
|
#define TSDB_CFG_VALUE_LEN 41
|
||||||
|
|
||||||
|
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -14,7 +14,6 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include "shash.h"
|
#include "shash.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tidpool.h"
|
#include "tidpool.h"
|
||||||
|
@ -30,6 +29,7 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tudp.h"
|
#include "tudp.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "lz4.h"
|
||||||
|
|
||||||
#pragma GCC diagnostic ignored "-Wpointer-to-int-cast"
|
#pragma GCC diagnostic ignored "-Wpointer-to-int-cast"
|
||||||
|
|
||||||
|
@ -50,8 +50,7 @@ typedef struct {
|
||||||
char encrypt;
|
char encrypt;
|
||||||
uint8_t secret[TSDB_KEY_LEN];
|
uint8_t secret[TSDB_KEY_LEN];
|
||||||
uint8_t ckey[TSDB_KEY_LEN];
|
uint8_t ckey[TSDB_KEY_LEN];
|
||||||
|
uint16_t localPort; // for UDP only
|
||||||
uint16_t localPort; // for UDP only
|
|
||||||
uint32_t peerUid;
|
uint32_t peerUid;
|
||||||
uint32_t peerIp; // peer IP
|
uint32_t peerIp; // peer IP
|
||||||
uint16_t peerPort; // peer port
|
uint16_t peerPort; // peer port
|
||||||
|
@ -66,7 +65,7 @@ typedef struct {
|
||||||
void * chandle; // handle passed by TCP/UDP connection layer
|
void * chandle; // handle passed by TCP/UDP connection layer
|
||||||
void * ahandle; // handle returned by upper app layter
|
void * ahandle; // handle returned by upper app layter
|
||||||
int retry;
|
int retry;
|
||||||
int tretry; // total retry
|
int tretry; // total retry
|
||||||
void * pTimer;
|
void * pTimer;
|
||||||
void * pIdleTimer;
|
void * pIdleTimer;
|
||||||
char * pRspMsg;
|
char * pRspMsg;
|
||||||
|
@ -79,7 +78,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int sessions;
|
int sessions;
|
||||||
void * qhandle; // for scheduler
|
void * qhandle; // for scheduler
|
||||||
SRpcConn * connList;
|
SRpcConn * connList;
|
||||||
void * idPool;
|
void * idPool;
|
||||||
void * tmrCtrl;
|
void * tmrCtrl;
|
||||||
|
@ -94,11 +93,11 @@ typedef struct rpc_server {
|
||||||
int mask;
|
int mask;
|
||||||
int numOfChanns;
|
int numOfChanns;
|
||||||
int numOfThreads;
|
int numOfThreads;
|
||||||
int idMgmt; // ID management method
|
int idMgmt; // ID management method
|
||||||
int type;
|
int type;
|
||||||
int idleTime; // milliseconds;
|
int idleTime; // milliseconds;
|
||||||
int noFree; // do not free the request msg when rsp is received
|
int noFree; // do not free the request msg when rsp is received
|
||||||
int index; // for UDP server, next thread for new connection
|
int index; // for UDP server, next thread for new connection
|
||||||
uint16_t localPort;
|
uint16_t localPort;
|
||||||
char label[12];
|
char label[12];
|
||||||
void *(*fp)(char *, void *ahandle, void *thandle);
|
void *(*fp)(char *, void *ahandle, void *thandle);
|
||||||
|
@ -107,8 +106,7 @@ typedef struct rpc_server {
|
||||||
SRpcChann *channList;
|
SRpcChann *channList;
|
||||||
} STaosRpc;
|
} STaosRpc;
|
||||||
|
|
||||||
|
int tsRpcProgressTime = 10; // milliseocnds
|
||||||
int tsRpcProgressTime = 10; // milliseocnds
|
|
||||||
|
|
||||||
// not configurable
|
// not configurable
|
||||||
int tsRpcMaxRetry;
|
int tsRpcMaxRetry;
|
||||||
|
@ -141,6 +139,89 @@ void taosProcessSchedMsg(SSchedMsg *pMsg);
|
||||||
int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
|
int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
|
||||||
int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
|
int taosBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey);
|
||||||
|
|
||||||
|
static int32_t taosCompressRpcMsg(char* pCont, int32_t contLen) {
|
||||||
|
STaosHeader* pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
|
||||||
|
int32_t overhead = sizeof(int32_t) * 2;
|
||||||
|
int32_t finalLen = 0;
|
||||||
|
|
||||||
|
if (!NEEDTO_COMPRESSS_MSG(contLen)) {
|
||||||
|
return contLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *buf = malloc (contLen + overhead + 8); // 16 extra bytes
|
||||||
|
if (buf == NULL) {
|
||||||
|
tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno));
|
||||||
|
return contLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* only the compressed size is less than the value of contLen - overhead, the compression is applied
|
||||||
|
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
|
||||||
|
*/
|
||||||
|
if (compLen < contLen - overhead) {
|
||||||
|
//tDump(pCont, contLen);
|
||||||
|
int32_t *pLen = (int32_t *)pCont;
|
||||||
|
|
||||||
|
*pLen = 0; // first 4 bytes must be zero
|
||||||
|
pLen = (int32_t *)(pCont + sizeof(int32_t));
|
||||||
|
|
||||||
|
*pLen = htonl(contLen); // contLen is encoded in second 4 bytes
|
||||||
|
memcpy(pCont + overhead, buf, compLen);
|
||||||
|
|
||||||
|
pHeader->comp = 1;
|
||||||
|
tTrace("compress rpc msg, before:%lld, after:%lld", contLen, compLen);
|
||||||
|
|
||||||
|
finalLen = compLen + overhead;
|
||||||
|
//tDump(pCont, contLen);
|
||||||
|
} else {
|
||||||
|
finalLen = contLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
free(buf);
|
||||||
|
return finalLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static STaosHeader* taosDecompressRpcMsg(STaosHeader* pHeader, SSchedMsg* pSchedMsg, int32_t msgLen) {
|
||||||
|
int overhead = sizeof(int32_t) * 2;
|
||||||
|
|
||||||
|
if (pHeader->comp == 0) {
|
||||||
|
pSchedMsg->msg = (char *)(&(pHeader->destId));
|
||||||
|
return pHeader;
|
||||||
|
}
|
||||||
|
|
||||||
|
// decompress the content
|
||||||
|
assert(GET_INT32_VAL(pHeader->content) == 0);
|
||||||
|
|
||||||
|
// contLen is original message length before compression applied
|
||||||
|
int contLen = htonl(GET_INT32_VAL(pHeader->content + sizeof(int32_t)));
|
||||||
|
|
||||||
|
// prepare the temporary buffer to decompress message
|
||||||
|
char *buf = malloc(sizeof(STaosHeader) + contLen);
|
||||||
|
|
||||||
|
//tDump(pHeader->content, msgLen);
|
||||||
|
|
||||||
|
if (buf) {
|
||||||
|
int32_t originalLen = LZ4_decompress_safe(pHeader->content + overhead, buf + sizeof(STaosHeader),
|
||||||
|
msgLen - overhead, contLen);
|
||||||
|
|
||||||
|
memcpy(buf, pHeader, sizeof(STaosHeader));
|
||||||
|
free(pHeader); // free the compressed message buffer
|
||||||
|
|
||||||
|
STaosHeader* pNewHeader = (STaosHeader *) buf;
|
||||||
|
pNewHeader->msgLen = originalLen + (int) sizeof(SIntMsg);
|
||||||
|
assert(originalLen == contLen);
|
||||||
|
|
||||||
|
pSchedMsg->msg = (char *)(&(pNewHeader->destId));
|
||||||
|
//tDump(pHeader->content, contLen);
|
||||||
|
return pNewHeader;
|
||||||
|
} else {
|
||||||
|
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno));
|
||||||
|
pSchedMsg->msg = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
char *taosBuildReqHeader(void *param, char type, char *msg) {
|
char *taosBuildReqHeader(void *param, char type, char *msg) {
|
||||||
STaosHeader *pHeader;
|
STaosHeader *pHeader;
|
||||||
SRpcConn * pConn = (SRpcConn *)param;
|
SRpcConn * pConn = (SRpcConn *)param;
|
||||||
|
@ -1074,8 +1155,9 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
// parsing error
|
// parsing error
|
||||||
|
|
||||||
if (pHeader->msgType & 1) {
|
if (pHeader->msgType & 1U) {
|
||||||
memset(pReply, 0, sizeof(pReply));
|
memset(pReply, 0, sizeof(pReply));
|
||||||
|
|
||||||
msgLen = taosBuildErrorMsgToPeer(data, code, pReply);
|
msgLen = taosBuildErrorMsgToPeer(data, code, pReply);
|
||||||
(*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle);
|
(*taosSendData[pServer->type])(ip, port, pReply, msgLen, chandle);
|
||||||
tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid,
|
tTrace("%s cid:%d sid:%d id:%s, %s is sent with error code:%u pConn:%p", pServer->label, chann, sid,
|
||||||
|
@ -1090,17 +1172,17 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
|
||||||
// parsing OK
|
// parsing OK
|
||||||
|
|
||||||
// internal communication is based on TAOS protocol, a trick here to make it efficient
|
// internal communication is based on TAOS protocol, a trick here to make it efficient
|
||||||
pHeader->msgLen = msgLen - (int)sizeof(STaosHeader) + (int)sizeof(SIntMsg);
|
if (pHeader->spi) msgLen -= sizeof(STaosDigest);
|
||||||
if (pHeader->spi) pHeader->msgLen -= sizeof(STaosDigest);
|
msgLen -= (int)sizeof(STaosHeader);
|
||||||
|
pHeader->msgLen = msgLen + (int)sizeof(SIntMsg);
|
||||||
|
|
||||||
if ((pHeader->msgType & 1) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) {
|
if ((pHeader->msgType & 1U) == 0 && (pHeader->content[0] == TSDB_CODE_INVALID_VALUE)) {
|
||||||
schedMsg.msg = NULL; // connection shall be closed
|
schedMsg.msg = NULL; // connection shall be closed
|
||||||
} else {
|
} else {
|
||||||
schedMsg.msg = (char *)(&(pHeader->destId));
|
pHeader = taosDecompressRpcMsg(pHeader, &schedMsg, msgLen);
|
||||||
// memcpy(schedMsg.msg, (char *)(&(pHeader->destId)), pHeader->msgLen);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
|
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16U)) {
|
||||||
tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid,
|
tTrace("%s cid:%d sid:%d id:%s, %s is put into queue, msgLen:%d pConn:%p pTimer:%p", pServer->label, chann, sid,
|
||||||
pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer);
|
pHeader->meterId, taosMsg[pHeader->msgType], pHeader->msgLen, pConn, pConn->pTimer);
|
||||||
}
|
}
|
||||||
|
@ -1132,9 +1214,12 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
|
||||||
pChann = pServer->channList + pConn->chann;
|
pChann = pServer->channList + pConn->chann;
|
||||||
pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
|
pHeader = (STaosHeader *)(pCont - sizeof(STaosHeader));
|
||||||
msg = (char *)pHeader;
|
msg = (char *)pHeader;
|
||||||
msgLen = contLen + (int32_t)sizeof(STaosHeader);
|
|
||||||
|
|
||||||
if ((pHeader->msgType & 1) == 0 && pConn->localPort) pHeader->port = pConn->localPort;
|
if ((pHeader->msgType & 1U) == 0 && pConn->localPort) pHeader->port = pConn->localPort;
|
||||||
|
|
||||||
|
contLen = taosCompressRpcMsg(pCont, contLen);
|
||||||
|
|
||||||
|
msgLen = contLen + (int32_t)sizeof(STaosHeader);
|
||||||
|
|
||||||
if (pConn->spi) {
|
if (pConn->spi) {
|
||||||
// add auth part
|
// add auth part
|
||||||
|
@ -1151,7 +1236,7 @@ int taosSendMsgToPeerH(void *thandle, char *pCont, int contLen, void *ahandle) {
|
||||||
pthread_mutex_lock(&pChann->mutex);
|
pthread_mutex_lock(&pChann->mutex);
|
||||||
msgType = pHeader->msgType;
|
msgType = pHeader->msgType;
|
||||||
|
|
||||||
if ((msgType & 1) == 0) {
|
if ((msgType & 1U) == 0) {
|
||||||
// response
|
// response
|
||||||
pConn->inType = 0;
|
pConn->inType = 0;
|
||||||
tfree(pConn->pRspMsg);
|
tfree(pConn->pRspMsg);
|
||||||
|
|
|
@ -675,7 +675,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
||||||
|
|
||||||
// send create message to the selected vnode servers
|
// send create message to the selected vnode servers
|
||||||
if (pCreate->numOfTags == 0) {
|
if (pCreate->numOfTags == 0) {
|
||||||
mTrace("table:%s, send create msg to dnode, vgId:%d, sid:%d, vnode:%d",
|
mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d",
|
||||||
pMeter->meterId, pMeter->gid.vgId, pMeter->gid.sid, pVgroup->vnodeGid[0].vnode);
|
pMeter->meterId, pMeter->gid.vgId, pMeter->gid.sid, pVgroup->vnodeGid[0].vnode);
|
||||||
|
|
||||||
grantAddTimeSeries(pMeter->numOfColumns - 1);
|
grantAddTimeSeries(pMeter->numOfColumns - 1);
|
||||||
|
|
|
@ -644,6 +644,7 @@ static void doInitGlobalConfig() {
|
||||||
tsInitConfigOption(cfg++, "defaultPass", tsDefaultPass, TSDB_CFG_VTYPE_STRING,
|
tsInitConfigOption(cfg++, "defaultPass", tsDefaultPass, TSDB_CFG_VTYPE_STRING,
|
||||||
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT,
|
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT,
|
||||||
0, 0, TSDB_PASSWORD_LEN, TSDB_CFG_UTYPE_NONE);
|
0, 0, TSDB_PASSWORD_LEN, TSDB_CFG_UTYPE_NONE);
|
||||||
|
|
||||||
// socket type, udp by default
|
// socket type, udp by default
|
||||||
tsInitConfigOption(cfg++, "sockettype", tsSocketType, TSDB_CFG_VTYPE_STRING,
|
tsInitConfigOption(cfg++, "sockettype", tsSocketType, TSDB_CFG_VTYPE_STRING,
|
||||||
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW,
|
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW,
|
||||||
|
|
Loading…
Reference in New Issue