Merge pull request #10276 from taosdata/feature/privilege

serialize drop topic req
This commit is contained in:
Shengliang Guan 2022-02-16 13:36:20 +08:00 committed by GitHub
commit 88b569e658
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 377 additions and 284 deletions

View File

@ -234,9 +234,9 @@ typedef struct {
void* pMsg; void* pMsg;
} SSubmitMsgIter; } SSubmitMsgIter;
int tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); int32_t tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter);
int tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
typedef struct { typedef struct {
@ -295,6 +295,17 @@ int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
void tFreeSMAltertbReq(SMAltertbReq* pReq); void tFreeSMAltertbReq(SMAltertbReq* pReq);
typedef struct SEpSet {
int8_t inUse;
int8_t numOfEps;
SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(void* buf, SEpSet* pEp);
typedef struct { typedef struct {
int32_t pid; int32_t pid;
char app[TSDB_APP_NAME_LEN]; char app[TSDB_APP_NAME_LEN];
@ -302,62 +313,21 @@ typedef struct {
int64_t startTime; int64_t startTime;
} SConnectReq; } SConnectReq;
typedef struct SEpSet { int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
int8_t inUse; int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
int8_t numOfEps;
SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
int tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse);
tlen += taosEncodeFixedI8(buf, pEp->numOfEps);
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
tlen += taosEncodeFixedU16(buf, pEp->eps[i].port);
tlen += taosEncodeString(buf, pEp->eps[i].fqdn);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
buf = taosDecodeFixedI8(buf, &pEp->inUse);
buf = taosDecodeFixedI8(buf, &pEp->numOfEps);
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
buf = taosDecodeFixedU16(buf, &pEp->eps[i].port);
buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn);
}
return buf;
}
static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1;
if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
static FORCE_INLINE int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp) {
if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1;
if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1;
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
typedef struct { typedef struct {
int32_t acctId; int32_t acctId;
int64_t clusterId; int64_t clusterId;
int32_t connId; int32_t connId;
int8_t superUser; int8_t superUser;
int8_t align[3];
SEpSet epSet; SEpSet epSet;
char sVersion[128]; char sVersion[128];
} SConnectRsp; } SConnectRsp;
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
int32_t tDeserializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
@ -1116,41 +1086,17 @@ typedef struct {
char* sql; char* sql;
char* physicalPlan; char* physicalPlan;
char* logicalPlan; char* logicalPlan;
} SCMCreateTopicReq; } SMCreateTopicReq;
static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { int32_t tSerializeMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq);
int tlen = 0; void* tDeserializeSMCreateTopicReq(void* buf, SMCreateTopicReq* pReq);
tlen += taosEncodeFixedI8(buf, pReq->igExists);
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeString(buf, pReq->logicalPlan);
return tlen;
}
static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) {
buf = taosDecodeFixedI8(buf, &(pReq->igExists));
buf = taosDecodeString(buf, &(pReq->name));
buf = taosDecodeString(buf, &(pReq->sql));
buf = taosDecodeString(buf, &(pReq->physicalPlan));
buf = taosDecodeString(buf, &(pReq->logicalPlan));
return buf;
}
typedef struct { typedef struct {
int64_t topicId; int64_t topicId;
} SCMCreateTopicRsp; } SMCreateTopicRsp;
static FORCE_INLINE int tSerializeSCMCreateTopicRsp(void** buf, const SCMCreateTopicRsp* pRsp) { int32_t tSerializeSMCreateTopicRsp(void* buf, int32_t bufLen, const SMCreateTopicRsp* pRsp);
int tlen = 0; int32_t tDeserializeSMCreateTopicRsp(void* buf, int32_t bufLen, SMCreateTopicRsp* pRsp);
tlen += taosEncodeFixedI64(buf, pRsp->topicId);
return tlen;
}
static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopicRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->topicId);
return buf;
}
typedef struct { typedef struct {
int32_t topicNum; int32_t topicNum;
@ -1313,19 +1259,13 @@ typedef struct {
int64_t status; int64_t status;
} SMVSubscribeRsp; } SMVSubscribeRsp;
typedef struct {
char name[TSDB_TOPIC_NAME_LEN];
int8_t igExists;
int32_t execLen;
void* executor;
int32_t sqlLen;
char* sql;
} SCreateTopicReq;
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists; int8_t igNotExists;
} SDropTopicReq; } SMDropTopicReq;
int32_t tSerializeSMDropTopicReqq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];

View File

@ -137,7 +137,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SCMCreateTopicReq, SCMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)

View File

@ -365,32 +365,30 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
} }
pMsgSendInfo->msgType = TDMT_MND_CONNECT; pMsgSendInfo->msgType = TDMT_MND_CONNECT;
pMsgSendInfo->msgInfo.len = sizeof(SConnectReq);
pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId; pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
pMsgSendInfo->param = pRequest; pMsgSendInfo->param = pRequest;
SConnectReq *pConnect = calloc(1, sizeof(SConnectReq)); SConnectReq connectReq = {0};
if (pConnect == NULL) {
tfree(pMsgSendInfo);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
STscObj* pObj = pRequest->pTscObj; STscObj* pObj = pRequest->pTscObj;
char* db = getDbOfConnection(pObj); char* db = getDbOfConnection(pObj);
if (db != NULL) { if (db != NULL) {
tstrncpy(pConnect->db, db, sizeof(pConnect->db)); tstrncpy(connectReq.db, db, sizeof(connectReq.db));
} }
tfree(db); tfree(db);
pConnect->pid = htonl(appInfo.pid); connectReq.pid = htonl(appInfo.pid);
pConnect->startTime = htobe64(appInfo.startTime); connectReq.startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
pMsgSendInfo->msgInfo.pData = pConnect; int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = malloc(contLen);
tSerializeSConnectReq(pReq, contLen, &connectReq);
pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgInfo.pData = pReq;
return pMsgSendInfo; return pMsgSendInfo;
} }

View File

@ -47,39 +47,33 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData; SConnectRsp connectRsp = {0};
pConnect->acctId = htonl(pConnect->acctId); tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp);
pConnect->connId = htonl(pConnect->connId); assert(connectRsp.epSet.numOfEps > 0);
pConnect->clusterId = htobe64(pConnect->clusterId);
assert(pConnect->epSet.numOfEps > 0); if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
pConnect->epSet.eps[i].port = htons(pConnect->epSet.eps[i].port);
} }
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) { for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pConnect->epSet);
}
for (int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) {
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i, tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
pConnect->epSet.eps[i].fqdn, pConnect->epSet.eps[i].port, pTscObj->id); connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
} }
pTscObj->connId = pConnect->connId; pTscObj->connId = connectRsp.connId;
pTscObj->acctId = pConnect->acctId; pTscObj->acctId = connectRsp.acctId;
tstrncpy(pTscObj->ver, pConnect->sVersion, tListLen(pTscObj->ver)); tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver));
// update the appInstInfo // update the appInstInfo
pTscObj->pAppInfo->clusterId = pConnect->clusterId; pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
pTscObj->connType = HEARTBEAT_TYPE_QUERY; pTscObj->connType = HEARTBEAT_TYPE_QUERY;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY); hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY);
// pRequest->body.resInfo.pRspMsg = pMsg->pData; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
pTscObj->pAppInfo->numOfConns); pTscObj->pAppInfo->numOfConns);
free(pMsg->pData); free(pMsg->pData);

View File

@ -360,7 +360,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
char topicFname[TSDB_TOPIC_FNAME_LEN] = {0}; char topicFname[TSDB_TOPIC_FNAME_LEN] = {0};
tNameExtractFullName(&name, topicFname); tNameExtractFullName(&name, topicFname);
SCMCreateTopicReq req = { SMCreateTopicReq req = {
.name = (char*)topicFname, .name = (char*)topicFname,
.igExists = 1, .igExists = 1,
.physicalPlan = (char*)pStr, .physicalPlan = (char*)pStr,
@ -368,14 +368,14 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
.logicalPlan = (char*)"no logic plan", .logicalPlan = (char*)"no logic plan",
}; };
int tlen = tSerializeSCMCreateTopicReq(NULL, &req); int tlen = tSerializeMCreateTopicReq(NULL, &req);
void* buf = malloc(tlen); void* buf = malloc(tlen);
if (buf == NULL) { if (buf == NULL) {
goto _return; goto _return;
} }
void* abuf = buf; void* abuf = buf;
tSerializeSCMCreateTopicReq(&abuf, &req); tSerializeMCreateTopicReq(&abuf, &req);
/*printf("formatted: %s\n", dagStr);*/ /*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};

View File

@ -85,6 +85,47 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
} }
} }
int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1;
if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
int32_t tDecodeSEpSet(SCoder *pDecoder, SEpSet *pEp) {
if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1;
if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse);
tlen += taosEncodeFixedI8(buf, pEp->numOfEps);
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
tlen += taosEncodeFixedU16(buf, pEp->eps[i].port);
tlen += taosEncodeString(buf, pEp->eps[i].fqdn);
}
return tlen;
}
void *taosDecodeSEpSet(void *buf, SEpSet *pEp) {
buf = taosDecodeFixedI8(buf, &pEp->inUse);
buf = taosDecodeFixedI8(buf, &pEp->numOfEps);
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
buf = taosDecodeFixedU16(buf, &pEp->eps[i].port);
buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn);
}
return buf;
}
static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) { static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) {
if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1; if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1;
@ -1788,3 +1829,140 @@ int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq
tCoderClear(&decoder); tCoderClear(&decoder);
return 0; return 0;
} }
int32_t tSerializeSMDropTopicReqq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeMCreateTopicReq(void **buf, const SMCreateTopicReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pReq->igExists);
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeString(buf, pReq->logicalPlan);
return tlen;
}
void *tDeserializeSMCreateTopicReq(void *buf, SMCreateTopicReq *pReq) {
buf = taosDecodeFixedI8(buf, &(pReq->igExists));
buf = taosDecodeString(buf, &(pReq->name));
buf = taosDecodeString(buf, &(pReq->sql));
buf = taosDecodeString(buf, &(pReq->physicalPlan));
buf = taosDecodeString(buf, &(pReq->logicalPlan));
return buf;
}
int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopicRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->topicId) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMCreateTopicRsp(void *buf, int32_t bufLen, SMCreateTopicRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->topicId) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pid) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->app) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->startTime) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->acctId) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->connId) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1;
if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}

View File

@ -243,7 +243,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
pEpSet->inUse = pEpSet->numOfEps; pEpSet->inUse = pEpSet->numOfEps;
} }
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, htons(pObj->pDnode->port)); addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pSdb, pObj); sdbRelease(pSdb, pObj);
} }
} }

View File

@ -189,10 +189,12 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SConnObj *pConn = NULL; SConnObj *pConn = NULL;
int32_t code = -1; int32_t code = -1;
SConnectReq connReq = {0};
SConnectReq *pConnReq = pReq->rpcMsg.pCont; if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) {
pConnReq->pid = htonl(pConnReq->pid); terrno = TSDB_CODE_INVALID_MSG;
pConnReq->startTime = htobe64(pConnReq->startTime); goto CONN_OVER;
}
SRpcConnInfo info = {0}; SRpcConnInfo info = {0};
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) { if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
@ -209,41 +211,42 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
goto CONN_OVER; goto CONN_OVER;
} }
if (pConnReq->db[0]) { if (connReq.db[0]) {
snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db); snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
pDb = mndAcquireDb(pMnode, pReq->db); pDb = mndAcquireDb(pMnode, pReq->db);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB; terrno = TSDB_CODE_MND_INVALID_DB;
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr()); mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr());
goto CONN_OVER; goto CONN_OVER;
} }
} }
pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime); pConn = mndCreateConn(pMnode, &info, connReq.pid, connReq.app, connReq.startTime);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr()); mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
goto CONN_OVER; goto CONN_OVER;
} }
SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp)); SConnectRsp connectRsp = {0};
if (pRsp == NULL) { connectRsp.acctId = pUser->acctId;
terrno = TSDB_CODE_OUT_OF_MEMORY; connectRsp.superUser = pUser->superUser;
mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr()); connectRsp.clusterId = pMnode->clusterId;
goto CONN_OVER; connectRsp.connId = pConn->id;
}
pRsp->acctId = htonl(pUser->acctId); snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
pRsp->superUser = pUser->superUser; gitinfo);
pRsp->clusterId = htobe64(pMnode->clusterId); mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
pRsp->connId = htonl(pConn->id);
snprintf(pRsp->sVersion, tListLen(pRsp->sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo); int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
mndGetMnodeEpSet(pMnode, &pRsp->epSet); if (contLen < 0) goto CONN_OVER;
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) goto CONN_OVER;
tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
pReq->contLen = sizeof(SConnectRsp); pReq->contLen = contLen;
pReq->pCont = pRsp; pReq->pCont = pRsp;
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app); mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, connReq.app);
code = 0; code = 0;

View File

@ -1497,7 +1497,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
if (pStb->dbUid != pDb->uid) { if (pStb->dbUid != pDb->uid) {
if (strncmp(pStb->db, pDb->name, tListLen(pStb->db)) == 0) { if (strncmp(pStb->db, pDb->name, prefixLen) == 0) {
mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pStb->name, pDb->name, pDb->uid); mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pStb->name, pDb->name, pDb->uid);
} }

View File

@ -31,12 +31,12 @@
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq);
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp);
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq); static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq);
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
int32_t mndInitTopic(SMnode *pMnode) { int32_t mndInitTopic(SMnode *pMnode) {
@ -48,8 +48,8 @@ int32_t mndInitTopic(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndTopicActionUpdate, .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
.deleteFp = (SdbDeleteFp)mndTopicActionDelete}; .deleteFp = (SdbDeleteFp)mndTopicActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicMsg); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicMsg); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
@ -225,12 +225,12 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
return pDrop; return pDrop;
} }
static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) { static int32_t mndCheckCreateTopicMsg(SMCreateTopicReq *creattopReq) {
// deserialize and other stuff // deserialize and other stuff
return 0; return 0;
} }
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq *pCreate, SDbObj *pDb) {
mDebug("topic:%s to create", pCreate->name); mDebug("topic:%s to create", pCreate->name);
SMqTopicObj topicObj = {0}; SMqTopicObj topicObj = {0};
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
@ -248,7 +248,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj);
if (pTopicRaw == NULL) return -1; if (pTopicRaw == NULL) return -1;
if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1;
/*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);*/ /*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);*/
/*mndTransAppendRedolog(pTrans, pTopicRaw);*/ /*mndTransAppendRedolog(pTrans, pTopicRaw);*/
/*if (mndTransPrepare(pMnode, pTrans) != 0) {*/ /*if (mndTransPrepare(pMnode, pTrans) != 0) {*/
/*mError("mq-createTopic-trans:%d, failed to prepare since %s", pTrans->id, terrstr());*/ /*mError("mq-createTopic-trans:%d, failed to prepare since %s", pTrans->id, terrstr());*/
@ -260,12 +260,12 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
return sdbWrite(pMnode->pSdb, pTopicRaw); return sdbWrite(pMnode->pSdb, pTopicRaw);
} }
static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
char *msgStr = pMsg->rpcMsg.pCont; char *msgStr = pReq->rpcMsg.pCont;
SCMCreateTopicReq createTopicReq = {0}; SMCreateTopicReq createTopicReq = {0};
tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq); tDeserializeSMCreateTopicReq(msgStr, &createTopicReq);
mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql); mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
@ -294,7 +294,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
return -1; return -1;
} }
int32_t code = mndCreateTopic(pMnode, pMsg, &createTopicReq, pDb); int32_t code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -306,44 +306,49 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
return 0; return 0;
} }
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; } static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { return 0; }
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SDropTopicReq *pDrop = pMsg->rpcMsg.pCont; SMDropTopicReq dropReq = {0};
mDebug("topic:%s, start to drop", pDrop->name); if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pDrop->name); mDebug("topic:%s, start to drop", dropReq.name);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
if (pTopic == NULL) { if (pTopic == NULL) {
if (pDrop->igNotExists) { if (dropReq.igNotExists) {
mDebug("topic:%s, not exist, ignore not exist is set", pDrop->name); mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
return 0; return 0;
} else { } else {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
mError("topic:%s, failed to drop since %s", pDrop->name, terrstr()); mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
return -1; return -1;
} }
} }
int32_t code = mndDropTopic(pMnode, pMsg, pTopic); int32_t code = mndDropTopic(pMnode, pReq, pTopic);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;
mError("topic:%s, failed to drop since %s", pDrop->name, terrstr()); mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
return -1; return -1;
} }
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) { static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pMsg); mndTransProcessRsp(pRsp);
return 0; return 0;
} }
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq) { static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
STableInfoReq infoReq = {0}; STableInfoReq infoReq = {0};
@ -405,8 +410,8 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq) {
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
pMsg->pCont = pMeta; pReq->pCont = pMeta;
pMsg->contLen = contLen; pReq->contLen = contLen;
mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags); mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags);
#endif #endif
@ -438,8 +443,8 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
return 0; return 0;
} }
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
if (mndGetNumOfTopics(pMnode, pShow->db, &pShow->numOfRows) != 0) { if (mndGetNumOfTopics(pMnode, pShow->db, &pShow->numOfRows) != 0) {
@ -461,18 +466,6 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *
pSchema[cols].bytes = pShow->bytes[cols]; pSchema[cols].bytes = pShow->bytes[cols];
cols++; cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = cols; pMeta->numOfColumns = cols;
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
@ -488,29 +481,19 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *
return 0; return 0;
} }
static void mndExtractTableName(char *tableId, char *name) { static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
int32_t pos = -1; SMnode *pMnode = pReq->pMnode;
int32_t num = 0;
for (pos = 0; tableId[pos] != 0; ++pos) {
if (tableId[pos] == '.') num++;
if (num == 2) break;
}
if (num == 2) {
strcpy(name, tableId + pos + 1);
}
}
static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SMqTopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
int32_t cols = 0; int32_t cols = 0;
char *pWrite; char *pWrite;
char prefix[64] = {0}; char prefix[TSDB_DB_FNAME_LEN] = {0};
tstrncpy(prefix, pShow->db, 64); SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return 0;
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix); int32_t prefixLen = (int32_t)strlen(prefix);
@ -518,7 +501,11 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic); pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
if (pTopic->dbUid != pDb->uid) {
if (strncmp(pTopic->name, prefix, prefixLen) != 0) { if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
}
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
continue; continue;
} }
@ -535,18 +522,11 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in
*(int64_t *)pWrite = pTopic->createTime; *(int64_t *)pWrite = pTopic->createTime;
cols++; cols++;
/*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/
/**(int32_t *)pWrite = pTopic->numOfColumns;*/
/*cols++;*/
/*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/
/**(int32_t *)pWrite = pTopic->numOfTags;*/
/*cols++;*/
numOfRows++; numOfRows++;
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
} }
mndReleaseDb(pMnode, pDb);
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
return numOfRows; return numOfRows;

View File

@ -28,44 +28,44 @@ Testbase MndTestProfile::test;
int32_t MndTestProfile::connId; int32_t MndTestProfile::connId;
TEST_F(MndTestProfile, 01_ConnectMsg) { TEST_F(MndTestProfile, 01_ConnectMsg) {
int32_t contLen = sizeof(SConnectReq); SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_profile");
strcpy(connectReq.db, "");
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
pReq->pid = htonl(1234); void* pReq = rpcMallocCont(contLen);
strcpy(pReq->app, "mnode_test_profile"); tSerializeSConnectReq(pReq, contLen, &connectReq);
strcpy(pReq->db, "");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; SConnectRsp connectRsp = {0};
ASSERT_NE(pRsp, nullptr); tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
pRsp->acctId = htonl(pRsp->acctId);
pRsp->clusterId = htobe64(pRsp->clusterId);
pRsp->connId = htonl(pRsp->connId);
pRsp->epSet.eps[0].port = htons(pRsp->epSet.eps[0].port);
EXPECT_EQ(pRsp->acctId, 1); EXPECT_EQ(connectRsp.acctId, 1);
EXPECT_GT(pRsp->clusterId, 0); EXPECT_GT(connectRsp.clusterId, 0);
EXPECT_EQ(pRsp->connId, 1); EXPECT_EQ(connectRsp.connId, 1);
EXPECT_EQ(pRsp->superUser, 1); EXPECT_EQ(connectRsp.superUser, 1);
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(connectRsp.epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(connectRsp.epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.eps[0].port, 9031); EXPECT_EQ(connectRsp.epSet.eps[0].port, 9031);
EXPECT_STREQ(pRsp->epSet.eps[0].fqdn, "localhost"); EXPECT_STREQ(connectRsp.epSet.eps[0].fqdn, "localhost");
connId = pRsp->connId; connId = connectRsp.connId;
} }
TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) { TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) {
int32_t contLen = sizeof(SConnectReq); SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_profile");
strcpy(connectReq.db, "invalid_db");
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
pReq->pid = htonl(1234); void* pReq = rpcMallocCont(contLen);
strcpy(pReq->app, "mnode_test_profile"); tSerializeSConnectReq(pReq, contLen, &connectReq);
strcpy(pReq->db, "invalid_db");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
@ -194,35 +194,33 @@ TEST_F(MndTestProfile, 05_KillConnMsg) {
} }
{ {
int32_t contLen = sizeof(SConnectReq); SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_profile");
strcpy(connectReq.db, "invalid_db");
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
pReq->pid = htonl(1234); void* pReq = rpcMallocCont(contLen);
strcpy(pReq->app, "mnode_test_profile"); tSerializeSConnectReq(pReq, contLen, &connectReq);
strcpy(pReq->db, "");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; SConnectRsp connectRsp = {0};
ASSERT_NE(pRsp, nullptr); tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
pRsp->acctId = htonl(pRsp->acctId);
pRsp->clusterId = htobe64(pRsp->clusterId);
pRsp->connId = htonl(pRsp->connId);
pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]);
EXPECT_EQ(pRsp->acctId, 1); EXPECT_EQ(connectRsp.acctId, 1);
EXPECT_GT(pRsp->clusterId, 0); EXPECT_GT(connectRsp.clusterId, 0);
EXPECT_GT(pRsp->connId, connId); EXPECT_GT(connectRsp.connId, connId);
EXPECT_EQ(pRsp->superUser, 1); EXPECT_EQ(connectRsp.superUser, 1);
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(connectRsp.epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(connectRsp.epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_EQ(connectRsp.epSet.port[0], 9031);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(connectRsp.epSet.fqdn[0], "localhost");
connId = pRsp->connId; connId = connectRsp.connId;
} }
#endif #endif
} }

View File

@ -54,12 +54,14 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
} }
TEST_F(MndTestShow, 03_ShowMsg_Conn) { TEST_F(MndTestShow, 03_ShowMsg_Conn) {
int32_t contLen = sizeof(SConnectReq); SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_show");
strcpy(connectReq.db, "");
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
pReq->pid = htonl(1234); void* pReq = rpcMallocCont(contLen);
strcpy(pReq->app, "mnode_test_show"); tSerializeSConnectReq(pReq, contLen, &connectReq);
strcpy(pReq->db, "");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);