enh(rpc): update auth way
This commit is contained in:
parent
b602a77cc9
commit
8d03ff6f9b
|
@ -331,6 +331,8 @@ typedef struct {
|
||||||
int32_t pid;
|
int32_t pid;
|
||||||
char app[TSDB_APP_NAME_LEN];
|
char app[TSDB_APP_NAME_LEN];
|
||||||
char db[TSDB_DB_NAME_LEN];
|
char db[TSDB_DB_NAME_LEN];
|
||||||
|
char user[TSDB_USER_LEN];
|
||||||
|
char passwd[TSDB_PASSWORD_LEN];
|
||||||
int64_t startTime;
|
int64_t startTime;
|
||||||
} SConnectReq;
|
} SConnectReq;
|
||||||
|
|
||||||
|
@ -482,7 +484,7 @@ typedef struct {
|
||||||
char intervalUnit;
|
char intervalUnit;
|
||||||
char slidingUnit;
|
char slidingUnit;
|
||||||
char
|
char
|
||||||
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
|
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
|
|
|
@ -306,7 +306,6 @@ void hbMgrInitMqHbRspHandle();
|
||||||
|
|
||||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery);
|
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -146,7 +146,8 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
|
||||||
(*pRequest)->sqlstr[sqlLen] = 0;
|
(*pRequest)->sqlstr[sqlLen] = 0;
|
||||||
(*pRequest)->sqlLen = sqlLen;
|
(*pRequest)->sqlLen = sqlLen;
|
||||||
|
|
||||||
if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, sizeof((*pRequest)->self))) {
|
if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
|
||||||
|
sizeof((*pRequest)->self))) {
|
||||||
destroyRequest(*pRequest);
|
destroyRequest(*pRequest);
|
||||||
*pRequest = NULL;
|
*pRequest = NULL;
|
||||||
tscError("put request to request hash failed");
|
tscError("put request to request hash failed");
|
||||||
|
@ -263,7 +264,8 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
|
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
|
||||||
if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO && precision != TSDB_TIME_PRECISION_NANO) {
|
if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
|
||||||
|
precision != TSDB_TIME_PRECISION_NANO) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -275,7 +277,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
|
|
||||||
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
||||||
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
|
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
|
||||||
pRequest->metric.start, &res);
|
pRequest->metric.start, &res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (pRequest->body.queryJob != 0) {
|
if (pRequest->body.queryJob != 0) {
|
||||||
schedulerFreeJob(pRequest->body.queryJob);
|
schedulerFreeJob(pRequest->body.queryJob);
|
||||||
|
@ -300,7 +302,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) {
|
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) {
|
||||||
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
switch (pQuery->execMode) {
|
switch (pQuery->execMode) {
|
||||||
|
@ -328,7 +330,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
|
||||||
if (!keepQuery) {
|
if (!keepQuery) {
|
||||||
qDestroyQuery(pQuery);
|
qDestroyQuery(pQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
|
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
|
||||||
pRequest->code = terrno;
|
pRequest->code = terrno;
|
||||||
}
|
}
|
||||||
|
@ -336,7 +338,6 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
|
||||||
return pRequest;
|
return pRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
|
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
SQuery* pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
|
@ -522,6 +523,8 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) {
|
||||||
connectReq.pid = htonl(appInfo.pid);
|
connectReq.pid = htonl(appInfo.pid);
|
||||||
connectReq.startTime = htobe64(appInfo.startTime);
|
connectReq.startTime = htobe64(appInfo.startTime);
|
||||||
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
|
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
|
||||||
|
tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
|
||||||
|
tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
|
||||||
|
|
||||||
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
|
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
|
||||||
void* pReq = taosMemoryMalloc(contLen);
|
void* pReq = taosMemoryMalloc(contLen);
|
||||||
|
@ -726,7 +729,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
|
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
|
||||||
ASSERT(len <= bytes);
|
ASSERT(len <= bytes);
|
||||||
ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i]));
|
ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i]));
|
||||||
|
|
||||||
varDataSetLen(p, len);
|
varDataSetLen(p, len);
|
||||||
pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
|
pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
|
||||||
p += (len + VARSTR_HEADER_SIZE);
|
p += (len + VARSTR_HEADER_SIZE);
|
||||||
|
@ -744,51 +747,55 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
|
||||||
}
|
}
|
||||||
|
|
||||||
pResultInfo->convertBuf[i] = p;
|
pResultInfo->convertBuf[i] = p;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
SResultColumn* pCol = &pResultInfo->pCol[i];
|
SResultColumn* pCol = &pResultInfo->pCol[i];
|
||||||
for (int32_t j = 0; j < numOfRows; ++j) {
|
for (int32_t j = 0; j < numOfRows; ++j) {
|
||||||
if (pCol->offset[j] != -1) {
|
if (pCol->offset[j] != -1) {
|
||||||
char* pStart = pCol->offset[j] + pCol->pData;
|
char* pStart = pCol->offset[j] + pCol->pData;
|
||||||
|
|
||||||
int32_t jsonInnerType = *pStart;
|
int32_t jsonInnerType = *pStart;
|
||||||
char *jsonInnerData = pStart + CHAR_BYTES;
|
char* jsonInnerData = pStart + CHAR_BYTES;
|
||||||
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
|
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
|
||||||
if(jsonInnerType == TSDB_DATA_TYPE_NULL){
|
if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
|
||||||
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
|
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
|
||||||
varDataSetLen(dst, strlen(varDataVal(dst)));
|
varDataSetLen(dst, strlen(varDataVal(dst)));
|
||||||
}else if(jsonInnerType == TSDB_DATA_TYPE_JSON){
|
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
|
||||||
int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst));
|
int32_t length =
|
||||||
|
taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst));
|
||||||
|
|
||||||
if (length <= 0) {
|
if (length <= 0) {
|
||||||
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, varDataVal(jsonInnerData));
|
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
|
||||||
|
varDataVal(jsonInnerData));
|
||||||
length = 0;
|
length = 0;
|
||||||
}
|
}
|
||||||
varDataSetLen(dst, length);
|
varDataSetLen(dst, length);
|
||||||
}else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
|
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
|
||||||
*(char*)varDataVal(dst) = '\"';
|
*(char*)varDataVal(dst) = '\"';
|
||||||
int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst) + CHAR_BYTES);
|
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
|
||||||
|
varDataVal(dst) + CHAR_BYTES);
|
||||||
if (length <= 0) {
|
if (length <= 0) {
|
||||||
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, varDataVal(jsonInnerData));
|
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
|
||||||
|
varDataVal(jsonInnerData));
|
||||||
length = 0;
|
length = 0;
|
||||||
}
|
}
|
||||||
varDataSetLen(dst, length + CHAR_BYTES*2);
|
varDataSetLen(dst, length + CHAR_BYTES * 2);
|
||||||
*(char*)(varDataVal(dst), length + CHAR_BYTES) = '\"';
|
*(char*)(varDataVal(dst), length + CHAR_BYTES) = '\"';
|
||||||
}else if(jsonInnerType == TSDB_DATA_TYPE_DOUBLE){
|
} else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
double jsonVd = *(double*)(jsonInnerData);
|
double jsonVd = *(double*)(jsonInnerData);
|
||||||
sprintf(varDataVal(dst), "%.9lf", jsonVd);
|
sprintf(varDataVal(dst), "%.9lf", jsonVd);
|
||||||
varDataSetLen(dst, strlen(varDataVal(dst)));
|
varDataSetLen(dst, strlen(varDataVal(dst)));
|
||||||
}else if(jsonInnerType == TSDB_DATA_TYPE_BIGINT){
|
} else if (jsonInnerType == TSDB_DATA_TYPE_BIGINT) {
|
||||||
int64_t jsonVd = *(int64_t*)(jsonInnerData);
|
int64_t jsonVd = *(int64_t*)(jsonInnerData);
|
||||||
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
|
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
|
||||||
varDataSetLen(dst, strlen(varDataVal(dst)));
|
varDataSetLen(dst, strlen(varDataVal(dst)));
|
||||||
}else if(jsonInnerType == TSDB_DATA_TYPE_BOOL){
|
} else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
|
||||||
sprintf(varDataVal(dst), "%s", (*((char *)jsonInnerData) == 1) ? "true" : "false");
|
sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
|
||||||
varDataSetLen(dst, strlen(varDataVal(dst)));
|
varDataSetLen(dst, strlen(varDataVal(dst)));
|
||||||
}else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(len + varDataTLen(dst) > colLength[i]){
|
if (len + varDataTLen(dst) > colLength[i]) {
|
||||||
p = taosMemoryRealloc(pResultInfo->convertBuf[i], len + varDataTLen(dst));
|
p = taosMemoryRealloc(pResultInfo->convertBuf[i], len + varDataTLen(dst));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -2756,6 +2756,8 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
|
||||||
if (tEncodeI32(&encoder, pReq->pid) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->pid) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->app) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->app) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->passwd) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -2773,6 +2775,8 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
|
||||||
if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->passwd) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->startTime) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->startTime) < 0) return -1;
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
@ -3032,7 +3036,6 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) {
|
int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) {
|
||||||
SCoder encoder = {0};
|
SCoder encoder = {0};
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
@ -3052,7 +3055,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
|
||||||
SReplica *pReplica = &pReq->replicas[i];
|
SReplica *pReplica = &pReq->replicas[i];
|
||||||
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
|
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -3085,7 +3088,6 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
|
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
|
||||||
SCoder encoder = {0};
|
SCoder encoder = {0};
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
|
@ -119,10 +119,10 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans * pTrans = &pDnode->trans;
|
||||||
tmsg_t msgType = pMsg->msgType;
|
tmsg_t msgType = pMsg->msgType;
|
||||||
bool isReq = msgType & 1u;
|
bool isReq = msgType & 1u;
|
||||||
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
|
SMsgHandle * pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
|
||||||
SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
|
SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
|
||||||
|
|
||||||
if (msgType == TDMT_DND_SERVER_STATUS) {
|
if (msgType == TDMT_DND_SERVER_STATUS) {
|
||||||
|
@ -443,7 +443,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
|
||||||
SAuthReq authReq = {0};
|
SAuthReq authReq = {0};
|
||||||
tstrncpy(authReq.user, user, TSDB_USER_LEN);
|
tstrncpy(authReq.user, user, TSDB_USER_LEN);
|
||||||
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
|
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
|
||||||
void *pReq = rpcMallocCont(contLen);
|
void * pReq = rpcMallocCont(contLen);
|
||||||
tSerializeSAuthReq(pReq, contLen, &authReq);
|
tSerializeSAuthReq(pReq, contLen, &authReq);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
|
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
|
||||||
|
@ -523,4 +523,4 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
|
||||||
.pWrapper = pWrapper,
|
.pWrapper = pWrapper,
|
||||||
};
|
};
|
||||||
return msgCb;
|
return msgCb;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,20 +24,20 @@
|
||||||
#include "version.h"
|
#include "version.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint32_t id;
|
uint32_t id;
|
||||||
int8_t connType;
|
int8_t connType;
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
|
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
|
||||||
int64_t appStartTimeMs; // app start time
|
int64_t appStartTimeMs; // app start time
|
||||||
int32_t pid; // pid of app that invokes taosc
|
int32_t pid; // pid of app that invokes taosc
|
||||||
uint32_t ip;
|
uint32_t ip;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
int8_t killed;
|
int8_t killed;
|
||||||
int64_t loginTimeMs;
|
int64_t loginTimeMs;
|
||||||
int64_t lastAccessTimeMs;
|
int64_t lastAccessTimeMs;
|
||||||
uint64_t killId;
|
uint64_t killId;
|
||||||
int32_t numOfQueries;
|
int32_t numOfQueries;
|
||||||
SArray *pQueries; //SArray<SQueryDesc>
|
SArray * pQueries; // SArray<SQueryDesc>
|
||||||
} SConnObj;
|
} SConnObj;
|
||||||
|
|
||||||
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
|
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
|
||||||
|
@ -45,7 +45,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
|
||||||
static void mndFreeConn(SConnObj *pConn);
|
static void mndFreeConn(SConnObj *pConn);
|
||||||
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
|
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
|
||||||
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
|
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
|
||||||
static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
|
static void * mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
|
||||||
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq);
|
static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessConnectReq(SNodeMsg *pReq);
|
static int32_t mndProcessConnectReq(SNodeMsg *pReq);
|
||||||
|
@ -71,9 +71,9 @@ int32_t mndInitProfile(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
|
||||||
|
|
||||||
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
|
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
|
||||||
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
|
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -91,7 +91,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
|
||||||
int32_t pid, const char *app, int64_t startTime) {
|
int32_t pid, const char *app, int64_t startTime) {
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
char connStr[255] = {0};
|
char connStr[255] = {0};
|
||||||
int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
|
int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
|
||||||
int32_t connId = mndGenerateUid(connStr, len);
|
int32_t connId = mndGenerateUid(connStr, len);
|
||||||
if (startTime == 0) startTime = taosGetTimestampMs();
|
if (startTime == 0) startTime = taosGetTimestampMs();
|
||||||
|
@ -174,10 +174,10 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
|
static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode * pMnode = pReq->pNode;
|
||||||
SUserObj *pUser = NULL;
|
SUserObj * pUser = NULL;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj * pDb = NULL;
|
||||||
SConnObj *pConn = NULL;
|
SConnObj * pConn = NULL;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SConnectReq connReq = {0};
|
SConnectReq connReq = {0};
|
||||||
char ip[30] = {0};
|
char ip[30] = {0};
|
||||||
|
@ -194,6 +194,11 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
|
||||||
mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
|
mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
|
||||||
goto CONN_OVER;
|
goto CONN_OVER;
|
||||||
}
|
}
|
||||||
|
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
|
||||||
|
mError("user:%s, failed to auth while acquire user\n %s \r\n %s", pReq->user, connReq.passwd, pUser->pass);
|
||||||
|
code = TSDB_CODE_RPC_AUTH_FAILURE;
|
||||||
|
goto CONN_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
if (connReq.db[0]) {
|
if (connReq.db[0]) {
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
@ -253,8 +258,8 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
|
||||||
|
|
||||||
pConn->pQueries = pBasic->queryDesc;
|
pConn->pQueries = pBasic->queryDesc;
|
||||||
pBasic->queryDesc = NULL;
|
pBasic->queryDesc = NULL;
|
||||||
|
|
||||||
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
|
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -324,9 +329,10 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, SClientHbBatchRsp *pBatchRsp) {
|
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
|
||||||
|
SClientHbBatchRsp *pBatchRsp) {
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
|
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
|
||||||
|
|
||||||
if (pHbReq->query) {
|
if (pHbReq->query) {
|
||||||
SQueryHbReqBasic *pBasic = pHbReq->query;
|
SQueryHbReqBasic *pBasic = pHbReq->query;
|
||||||
|
@ -335,8 +341,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
rpcGetConnInfo(pMsg->handle, &connInfo);
|
rpcGetConnInfo(pMsg->handle, &connInfo);
|
||||||
|
|
||||||
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
|
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort, pBasic->pid, pBasic->app, 0);
|
pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
|
||||||
|
pBasic->pid, pBasic->app, 0);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
|
mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -345,7 +352,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
}
|
}
|
||||||
} else if (pConn->killed) {
|
} else if (pConn->killed) {
|
||||||
mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
|
mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
|
||||||
mndReleaseConn(pMnode, pConn);
|
mndReleaseConn(pMnode, pConn);
|
||||||
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
|
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -369,8 +376,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
}
|
}
|
||||||
|
|
||||||
rspBasic->connId = pConn->id;
|
rspBasic->connId = pConn->id;
|
||||||
rspBasic->totalDnodes = 1; //TODO
|
rspBasic->totalDnodes = 1; // TODO
|
||||||
rspBasic->onlineDnodes = 1; //TODO
|
rspBasic->onlineDnodes = 1; // TODO
|
||||||
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
|
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
|
||||||
mndReleaseConn(pMnode, pConn);
|
mndReleaseConn(pMnode, pConn);
|
||||||
|
|
||||||
|
@ -379,7 +386,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
|
|
||||||
int32_t kvNum = taosHashGetSize(pHbReq->info);
|
int32_t kvNum = taosHashGetSize(pHbReq->info);
|
||||||
if (NULL == pHbReq->info || kvNum <= 0) {
|
if (NULL == pHbReq->info || kvNum <= 0) {
|
||||||
taosArrayPush(pBatchRsp->rsps, &hbRsp);
|
taosArrayPush(pBatchRsp->rsps, &hbRsp);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -396,7 +403,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
|
|
||||||
switch (kv->key) {
|
switch (kv->key) {
|
||||||
case HEARTBEAT_KEY_DBINFO: {
|
case HEARTBEAT_KEY_DBINFO: {
|
||||||
void *rspMsg = NULL;
|
void * rspMsg = NULL;
|
||||||
int32_t rspLen = 0;
|
int32_t rspLen = 0;
|
||||||
mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
|
mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
|
||||||
if (rspMsg && rspLen > 0) {
|
if (rspMsg && rspLen > 0) {
|
||||||
|
@ -406,7 +413,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case HEARTBEAT_KEY_STBINFO: {
|
case HEARTBEAT_KEY_STBINFO: {
|
||||||
void *rspMsg = NULL;
|
void * rspMsg = NULL;
|
||||||
int32_t rspLen = 0;
|
int32_t rspLen = 0;
|
||||||
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
|
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
|
||||||
if (rspMsg && rspLen > 0) {
|
if (rspMsg && rspLen > 0) {
|
||||||
|
@ -457,7 +464,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
|
||||||
taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
|
taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
|
||||||
|
|
||||||
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
|
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
|
||||||
void *buf = rpcMallocCont(tlen);
|
void * buf = rpcMallocCont(tlen);
|
||||||
tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);
|
tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);
|
||||||
|
|
||||||
int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
|
int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
|
||||||
|
@ -479,7 +486,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
|
static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode * pMnode = pReq->pNode;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
|
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
|
@ -513,7 +520,7 @@ static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
|
static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode * pMnode = pReq->pNode;
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
|
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
|
@ -545,11 +552,11 @@ static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode * pMnode = pReq->pNode;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
SConnObj *pConn = NULL;
|
SConnObj *pConn = NULL;
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
char *pWrite;
|
char * pWrite;
|
||||||
char ipStr[TSDB_IPv4ADDR_LEN + 6];
|
char ipStr[TSDB_IPv4ADDR_LEN + 6];
|
||||||
|
|
||||||
if (pShow->pIter == NULL) {
|
if (pShow->pIter == NULL) {
|
||||||
|
@ -604,8 +611,8 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
#if 0
|
#if 0
|
||||||
SConnObj *pConn = NULL;
|
SConnObj *pConn = NULL;
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
@ -703,7 +710,7 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i
|
||||||
}
|
}
|
||||||
|
|
||||||
pShow->numOfRows += numOfRows;
|
pShow->numOfRows += numOfRows;
|
||||||
#endif
|
#endif
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -158,6 +158,7 @@ typedef struct {
|
||||||
char secured : 2;
|
char secured : 2;
|
||||||
char spi : 2;
|
char spi : 2;
|
||||||
|
|
||||||
|
char user[TSDB_UNI_LEN];
|
||||||
uint64_t ahandle; // ahandle assigned by client
|
uint64_t ahandle; // ahandle assigned by client
|
||||||
uint32_t code; // del later
|
uint32_t code; // del later
|
||||||
uint32_t msgType;
|
uint32_t msgType;
|
||||||
|
@ -186,23 +187,23 @@ typedef enum { Normal, Quit, Release, Register } STransMsgType;
|
||||||
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
|
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
|
||||||
|
|
||||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||||
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
|
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
|
||||||
|
|
||||||
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
|
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
|
||||||
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
|
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
|
||||||
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
|
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
|
||||||
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
|
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
|
||||||
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
|
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
|
||||||
#define rpcIsReq(type) (type & 1U)
|
#define rpcIsReq(type) (type & 1U)
|
||||||
|
|
||||||
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
|
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
|
||||||
|
|
||||||
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
|
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
|
||||||
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
|
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
|
||||||
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
|
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
|
||||||
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
|
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
|
||||||
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
|
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
|
||||||
#define transIsReq(type) (type & 1U)
|
#define transIsReq(type) (type & 1U)
|
||||||
|
|
||||||
int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
||||||
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
||||||
|
|
|
@ -614,35 +614,16 @@ void cliSend(SCliConn* pConn) {
|
||||||
pMsg->pCont = (void*)rpcMallocCont(0);
|
pMsg->pCont = (void*)rpcMallocCont(0);
|
||||||
pMsg->contLen = 0;
|
pMsg->contLen = 0;
|
||||||
}
|
}
|
||||||
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
|
||||||
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
|
|
||||||
|
|
||||||
int msgLen = transMsgLenFromCont(pMsg->contLen);
|
int msgLen = transMsgLenFromCont(pMsg->contLen);
|
||||||
|
|
||||||
if (!pConn->secured) {
|
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
||||||
char* buf = taosMemoryCalloc(1, msgLen + sizeof(STransUserMsg));
|
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
|
||||||
memcpy(buf, (char*)pHead, msgLen);
|
|
||||||
|
|
||||||
STransUserMsg* uMsg = (STransUserMsg*)(buf + msgLen);
|
|
||||||
memcpy(uMsg->user, pTransInst->user, tListLen(uMsg->user));
|
|
||||||
memcpy(uMsg->secret, pTransInst->secret, tListLen(uMsg->secret));
|
|
||||||
|
|
||||||
// to avoid mem leak
|
|
||||||
destroyUserdata(pMsg);
|
|
||||||
|
|
||||||
pMsg->pCont = (char*)buf + sizeof(STransMsgHead);
|
|
||||||
pMsg->contLen = msgLen + sizeof(STransUserMsg) - sizeof(STransMsgHead);
|
|
||||||
|
|
||||||
pHead = (STransMsgHead*)buf;
|
|
||||||
pHead->secured = 1;
|
|
||||||
msgLen += sizeof(STransUserMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
|
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
|
||||||
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
|
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
|
||||||
pHead->msgType = pMsg->msgType;
|
pHead->msgType = pMsg->msgType;
|
||||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
||||||
|
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||||
|
|
|
@ -46,7 +46,6 @@ typedef struct SSrvConn {
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
struct sockaddr_in locaddr;
|
struct sockaddr_in locaddr;
|
||||||
|
|
||||||
char secured;
|
|
||||||
int spi;
|
int spi;
|
||||||
char info[64];
|
char info[64];
|
||||||
char user[TSDB_UNI_LEN]; // user ID for the link
|
char user[TSDB_UNI_LEN]; // user ID for the link
|
||||||
|
@ -181,16 +180,9 @@ static void uvHandleReq(SSrvConn* pConn) {
|
||||||
uint32_t msgLen = pBuf->len;
|
uint32_t msgLen = pBuf->len;
|
||||||
|
|
||||||
STransMsgHead* pHead = (STransMsgHead*)msg;
|
STransMsgHead* pHead = (STransMsgHead*)msg;
|
||||||
if (pHead->secured == 1) {
|
|
||||||
STransUserMsg* uMsg = (STransUserMsg*)((char*)msg + msgLen - sizeof(STransUserMsg));
|
|
||||||
memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
|
|
||||||
memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
|
|
||||||
}
|
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
pHead->msgLen = htonl(pHead->msgLen);
|
||||||
if (pHead->secured == 1) {
|
memcpy(pConn->user, pHead->user, strlen(pHead->user));
|
||||||
pHead->msgLen -= sizeof(STransUserMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
CONN_SHOULD_RELEASE(pConn, pHead);
|
CONN_SHOULD_RELEASE(pConn, pHead);
|
||||||
|
|
||||||
|
@ -344,12 +336,6 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
|
||||||
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
||||||
pHead->ahandle = (uint64_t)pMsg->ahandle;
|
pHead->ahandle = (uint64_t)pMsg->ahandle;
|
||||||
|
|
||||||
// pHead->secured = pMsg->code == 0 ? 1 : 0; //
|
|
||||||
if (!pConn->secured) {
|
|
||||||
pConn->secured = pMsg->code == 0 ? 1 : 0;
|
|
||||||
}
|
|
||||||
pHead->secured = pConn->secured;
|
|
||||||
|
|
||||||
if (pConn->status == ConnNormal) {
|
if (pConn->status == ConnNormal) {
|
||||||
pHead->msgType = pConn->inType + 1;
|
pHead->msgType = pConn->inType + 1;
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue