feat:[TD-28247]add grant for subscribe and stream
This commit is contained in:
parent
82f3b3a931
commit
2798ff824c
|
@ -48,6 +48,7 @@ typedef enum {
|
||||||
TSDB_GRANT_CPU_CORES,
|
TSDB_GRANT_CPU_CORES,
|
||||||
TSDB_GRANT_STABLE,
|
TSDB_GRANT_STABLE,
|
||||||
TSDB_GRANT_TABLE,
|
TSDB_GRANT_TABLE,
|
||||||
|
TSDB_GRANT_SUBSCRIBE,
|
||||||
} EGrantType;
|
} EGrantType;
|
||||||
|
|
||||||
int32_t grantCheck(EGrantType grant);
|
int32_t grantCheck(EGrantType grant);
|
||||||
|
|
|
@ -3360,6 +3360,7 @@ typedef struct {
|
||||||
char name[TSDB_STREAM_FNAME_LEN];
|
char name[TSDB_STREAM_FNAME_LEN];
|
||||||
int8_t igNotExists;
|
int8_t igNotExists;
|
||||||
int8_t igUntreated;
|
int8_t igUntreated;
|
||||||
|
int8_t suspend;
|
||||||
} SMResumeStreamReq;
|
} SMResumeStreamReq;
|
||||||
|
|
||||||
int32_t tSerializeSMResumeStreamReq(void* buf, int32_t bufLen, const SMResumeStreamReq* pReq);
|
int32_t tSerializeSMResumeStreamReq(void* buf, int32_t bufLen, const SMResumeStreamReq* pReq);
|
||||||
|
@ -3754,7 +3755,12 @@ typedef struct {
|
||||||
} SMqHbReq;
|
} SMqHbReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t reserved;
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
int8_t noPrivilege;
|
||||||
|
} STopicPrivilege;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SArray* topicPrivileges; // SArray<STopicPrivilege>
|
||||||
} SMqHbRsp;
|
} SMqHbRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -3773,18 +3779,6 @@ typedef struct {
|
||||||
SVCreateTbReq cTbReq;
|
SVCreateTbReq cTbReq;
|
||||||
} SVSubmitBlk;
|
} SVSubmitBlk;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t flags;
|
|
||||||
int32_t nBlocks;
|
|
||||||
union {
|
|
||||||
SArray* pArray;
|
|
||||||
SVSubmitBlk* pBlocks;
|
|
||||||
};
|
|
||||||
} SVSubmitReq;
|
|
||||||
|
|
||||||
int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
|
|
||||||
int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead header;
|
SMsgHead header;
|
||||||
uint64_t sId;
|
uint64_t sId;
|
||||||
|
@ -3893,6 +3887,10 @@ int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
||||||
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
||||||
int32_t tDeatroySMqHbReq(SMqHbReq* pReq);
|
int32_t tDeatroySMqHbReq(SMqHbReq* pReq);
|
||||||
|
|
||||||
|
int32_t tSerializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
|
||||||
|
int32_t tDeserializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
|
||||||
|
int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp);
|
||||||
|
|
||||||
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||||
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||||
|
|
||||||
|
|
|
@ -155,6 +155,7 @@ typedef struct {
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
SArray* vgs; // SArray<SMqClientVg>
|
SArray* vgs; // SArray<SMqClientVg>
|
||||||
SSchemaWrapper schema;
|
SSchemaWrapper schema;
|
||||||
|
int8_t noPrivilege;
|
||||||
} SMqClientTopic;
|
} SMqClientTopic;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -739,6 +740,29 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
||||||
|
|
||||||
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
|
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
if (pMsg) {
|
if (pMsg) {
|
||||||
|
SMqHbRsp rsp = {0};
|
||||||
|
tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
|
||||||
|
|
||||||
|
int64_t refId = *(int64_t*)param;
|
||||||
|
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||||
|
if (tmq != NULL) {
|
||||||
|
taosWLockLatch(&tmq->lock);
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++){
|
||||||
|
STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
|
||||||
|
if(privilege->noPrivilege == 1){
|
||||||
|
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
|
||||||
|
for (int32_t j = 0; j < topicNumCur; j++) {
|
||||||
|
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
|
||||||
|
if(strcmp(pTopicCur->topicName, privilege->topic) == 0){
|
||||||
|
tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
|
||||||
|
pTopicCur->noPrivilege = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&tmq->lock);
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
taosMemoryFree(pMsg->pEpSet);
|
taosMemoryFree(pMsg->pEpSet);
|
||||||
}
|
}
|
||||||
|
@ -809,7 +833,9 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
|
|
||||||
sendInfo->requestId = generateRequestId();
|
sendInfo->requestId = generateRequestId();
|
||||||
sendInfo->requestObjRefId = 0;
|
sendInfo->requestObjRefId = 0;
|
||||||
sendInfo->param = NULL;
|
sendInfo->paramFreeFp = taosMemoryFree;
|
||||||
|
sendInfo->param = taosMemoryMalloc(sizeof(int64_t));
|
||||||
|
*(int64_t *)sendInfo->param = refId;
|
||||||
sendInfo->fp = tmqHbCb;
|
sendInfo->fp = tmqHbCb;
|
||||||
sendInfo->msgType = TDMT_MND_TMQ_HB;
|
sendInfo->msgType = TDMT_MND_TMQ_HB;
|
||||||
|
|
||||||
|
@ -1705,7 +1731,10 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
for (int i = 0; i < numOfTopics; i++) {
|
for (int i = 0; i < numOfTopics; i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
|
int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
|
||||||
|
if(pTopic->noPrivilege){
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
for (int j = 0; j < numOfVg; j++) {
|
for (int j = 0; j < numOfVg; j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms
|
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms
|
||||||
|
|
|
@ -6139,6 +6139,55 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tDeatroySMqHbRsp(SMqHbRsp *pRsp) {
|
||||||
|
taosArrayDestroy(pRsp->topicPrivileges);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
|
||||||
|
int32_t sz = taosArrayGetSize(pRsp->topicPrivileges);
|
||||||
|
if (tEncodeI32(&encoder, sz) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < sz; ++i) {
|
||||||
|
STopicPrivilege *privilege = (STopicPrivilege *)taosArrayGet(pRsp->topicPrivileges, i);
|
||||||
|
if (tEncodeCStr(&encoder, privilege->topic) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, privilege->noPrivilege) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
tDecoderInit(&decoder, (char *)buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
|
||||||
|
int32_t sz = 0;
|
||||||
|
if (tDecodeI32(&decoder, &sz) < 0) return -1;
|
||||||
|
if (sz > 0) {
|
||||||
|
pRsp->topicPrivileges = taosArrayInit(sz, sizeof(STopicPrivilege));
|
||||||
|
if (NULL == pRsp->topicPrivileges) return -1;
|
||||||
|
for (int32_t i = 0; i < sz; ++i) {
|
||||||
|
STopicPrivilege *data = taosArrayReserve(pRsp->topicPrivileges, 1);
|
||||||
|
if (tDecodeCStrTo(&decoder, data->topic) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &data->noPrivilege) < 0) return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tDeatroySMqHbReq(SMqHbReq *pReq) {
|
int32_t tDeatroySMqHbReq(SMqHbReq *pReq) {
|
||||||
for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) {
|
for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) {
|
||||||
TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i);
|
TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i);
|
||||||
|
@ -6194,7 +6243,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
||||||
if (NULL == pReq->topics) return -1;
|
if (NULL == pReq->topics) return -1;
|
||||||
for (int32_t i = 0; i < sz; ++i) {
|
for (int32_t i = 0; i < sz; ++i) {
|
||||||
TopicOffsetRows *data = taosArrayReserve(pReq->topics, 1);
|
TopicOffsetRows *data = taosArrayReserve(pReq->topics, 1);
|
||||||
tDecodeCStrTo(&decoder, data->topicName);
|
if (tDecodeCStrTo(&decoder, data->topicName) < 0) return -1;
|
||||||
int32_t szVgs = 0;
|
int32_t szVgs = 0;
|
||||||
if (tDecodeI32(&decoder, &szVgs) < 0) return -1;
|
if (tDecodeI32(&decoder, &szVgs) < 0) return -1;
|
||||||
if (szVgs > 0) {
|
if (szVgs > 0) {
|
||||||
|
@ -7753,36 +7802,6 @@ static int32_t tDecodeSVSubmitBlk(SDecoder *pCoder, SVSubmitBlk *pBlock, int32_t
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSVSubmitReq(SEncoder *pCoder, const SVSubmitReq *pReq) {
|
|
||||||
int32_t nBlocks = taosArrayGetSize(pReq->pArray);
|
|
||||||
|
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
|
||||||
|
|
||||||
if (tEncodeI32v(pCoder, pReq->flags) < 0) return -1;
|
|
||||||
if (tEncodeI32v(pCoder, nBlocks) < 0) return -1;
|
|
||||||
for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) {
|
|
||||||
if (tEncodeSVSubmitBlk(pCoder, (SVSubmitBlk *)taosArrayGet(pReq->pArray, iBlock), pReq->flags) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) {
|
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
|
|
||||||
if (tDecodeI32v(pCoder, &pReq->nBlocks) < 0) return -1;
|
|
||||||
pReq->pBlocks = tDecoderMalloc(pCoder, sizeof(SVSubmitBlk) * pReq->nBlocks);
|
|
||||||
if (pReq->pBlocks == NULL) return -1;
|
|
||||||
for (int32_t iBlock = 0; iBlock < pReq->nBlocks; iBlock++) {
|
|
||||||
if (tDecodeSVSubmitBlk(pCoder, pReq->pBlocks + iBlock, pReq->flags) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) {
|
static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
|
||||||
|
@ -8914,6 +8933,7 @@ int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStr
|
||||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->igUntreated) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->igUntreated) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->suspend) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -8928,6 +8948,7 @@ int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamR
|
||||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->igUntreated) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->igUntreated) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->suspend) < 0) return -1;
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
|
@ -30,7 +30,6 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
|
||||||
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname);
|
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname);
|
||||||
int32_t mndCheckViewPrivilege(SMnode *pMnode, const char *user, EOperType operType, const char *pViewFName);
|
int32_t mndCheckViewPrivilege(SMnode *pMnode, const char *user, EOperType operType, const char *pViewFName);
|
||||||
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic);
|
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic);
|
||||||
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName);
|
|
||||||
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname);
|
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname);
|
||||||
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
|
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
|
||||||
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp);
|
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp);
|
||||||
|
|
|
@ -102,7 +102,8 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
||||||
code = -1;
|
code = TSDB_CODE_MND_NO_RIGHTS;
|
||||||
|
terrno = TSDB_CODE_MND_NO_RIGHTS;
|
||||||
goto FAILED;
|
goto FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,22 +221,52 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char* user){
|
||||||
|
rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
|
||||||
|
if(rsp->topicPrivileges == NULL){
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++){
|
||||||
|
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||||
|
SMqTopicObj* pTopic = mndAcquireTopic(pMnode, topic);
|
||||||
|
if (pTopic == NULL) { // terrno has been set by callee function
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
|
||||||
|
if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) {
|
||||||
|
data->noPrivilege = 1;
|
||||||
|
} else{
|
||||||
|
data->noPrivilege = 0;
|
||||||
|
}
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
SMqHbReq req = {0};
|
SMqHbReq req = {0};
|
||||||
|
SMqHbRsp rsp = {0};
|
||||||
|
SMqConsumerObj *pConsumer = NULL;
|
||||||
|
|
||||||
if ((code = tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req)) < 0) {
|
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t consumerId = req.consumerId;
|
int64_t consumerId = req.consumerId;
|
||||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||||
if (pConsumer == NULL) {
|
if (pConsumer == NULL) {
|
||||||
mError("consumer:0x%" PRIx64 " not exist", consumerId);
|
mError("consumer:0x%" PRIx64 " not exist", consumerId);
|
||||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||||
code = -1;
|
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
code = checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user);
|
||||||
|
if(code != 0){
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -280,9 +311,22 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
mndReleaseSubscribe(pMnode, pSub);
|
mndReleaseSubscribe(pMnode, pSub);
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
// encode rsp
|
||||||
|
int32_t tlen = tSerializeSMqHbRsp(NULL, 0, &rsp);
|
||||||
|
void *buf = rpcMallocCont(tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
tSerializeSMqHbRsp(&buf, tlen, &rsp);
|
||||||
|
pMsg->info.rsp = buf;
|
||||||
|
pMsg->info.rspLen = tlen;
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
tDeatroySMqHbRsp(&rsp);
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
tDeatroySMqHbReq(&req);
|
tDeatroySMqHbReq(&req);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -500,6 +544,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
char *msgStr = pMsg->pCont;
|
char *msgStr = pMsg->pCont;
|
||||||
|
|
||||||
|
if(grantCheck(TSDB_GRANT_SUBSCRIBE) < 0){
|
||||||
|
terrno = TSDB_CODE_GRANT_EXPIRED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SCMSubscribeReq subscribe = {0};
|
SCMSubscribeReq subscribe = {0};
|
||||||
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
||||||
|
|
||||||
|
|
|
@ -30,9 +30,6 @@ int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType op
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; }
|
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; }
|
||||||
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteListRsp *pWhiteListRsp) {
|
int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteListRsp *pWhiteListRsp) {
|
||||||
|
|
|
@ -808,6 +808,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
int32_t sqlLen = 0;
|
int32_t sqlLen = 0;
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
|
||||||
|
terrno = TSDB_CODE_GRANT_STREAM_LIMITED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SCMCreateStreamReq createStreamReq = {0};
|
SCMCreateStreamReq createStreamReq = {0};
|
||||||
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
|
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -2034,17 +2039,22 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
|
|
||||||
SMResumeStreamReq pauseReq = {0};
|
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
|
||||||
if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
|
terrno = TSDB_CODE_GRANT_EXPIRED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMResumeStreamReq resumeReq = {0};
|
||||||
|
if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pStream = mndAcquireStream(pMnode, pauseReq.name);
|
pStream = mndAcquireStream(pMnode, resumeReq.name);
|
||||||
|
|
||||||
if (pStream == NULL) {
|
if (pStream == NULL) {
|
||||||
if (pauseReq.igNotExists) {
|
if (resumeReq.igNotExists) {
|
||||||
mInfo("stream:%s, not exist, if exist is set", pauseReq.name);
|
mInfo("stream:%s, not exist, if exist is set", resumeReq.name);
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2072,12 +2082,12 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_RESUME_NAME);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_RESUME_NAME);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr());
|
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name);
|
mInfo("trans:%d used to resume stream:%s", pTrans->id, resumeReq.name);
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
|
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||||
|
@ -2089,8 +2099,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||||
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
|
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
|
||||||
|
|
||||||
// resume all tasks
|
// resume all tasks
|
||||||
if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
|
if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) {
|
||||||
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
|
mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -3030,6 +3040,39 @@ static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
|
void* pIter = NULL;
|
||||||
|
while(1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
if(pStream->status != STREAM_STATUS__PAUSE){
|
||||||
|
SMPauseStreamReq *reqPause = rpcMallocCont(sizeof(SMPauseStreamReq));
|
||||||
|
if (reqPause == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
strcpy(reqPause->name, pStream->name);
|
||||||
|
reqPause->igNotExists = 1;
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.msgType = TDMT_MND_PAUSE_STREAM,
|
||||||
|
.pCont = reqPause,
|
||||||
|
.contLen = sizeof(SMPauseStreamReq),
|
||||||
|
.info = *info,
|
||||||
|
};
|
||||||
|
|
||||||
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SStreamHbMsg req = {0};
|
SStreamHbMsg req = {0};
|
||||||
|
@ -3039,6 +3082,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
int64_t streamId = 0;
|
int64_t streamId = 0;
|
||||||
int32_t transId = 0;
|
int32_t transId = 0;
|
||||||
|
|
||||||
|
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
|
||||||
|
if(suspendAllStreams(pMnode, &pReq->info) < 0){
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
|
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
|
||||||
|
|
||||||
|
|
|
@ -1925,6 +1925,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosHashPut(pNewUser->topics, pTopic->name, len, pTopic->name, TSDB_TOPIC_FNAME_LEN);
|
taosHashPut(pNewUser->topics, pTopic->name, len, pTopic->name, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ALTER_USER_DEL_SUBSCRIBE_TOPIC_PRIV(pAlterReq->alterType, pAlterReq->privileges)) {
|
if (ALTER_USER_DEL_SUBSCRIBE_TOPIC_PRIV(pAlterReq->alterType, pAlterReq->privileges)) {
|
||||||
|
@ -1935,6 +1936,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosHashRemove(pNewUser->topics, pAlterReq->objname, len);
|
taosHashRemove(pNewUser->topics, pAlterReq->objname, len);
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -0,0 +1,142 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import time
|
||||||
|
|
||||||
|
import taos
|
||||||
|
from taos.tmq import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.common import *
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.sqlset import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
self.setsql = TDSetSql()
|
||||||
|
self.stbname = 'stb'
|
||||||
|
self.user_name = 'test'
|
||||||
|
self.binary_length = 20 # the length of binary for column_dict
|
||||||
|
self.nchar_length = 20 # the length of nchar for column_dict
|
||||||
|
self.dbnames = ['db1']
|
||||||
|
self.column_dict = {
|
||||||
|
'ts': 'timestamp',
|
||||||
|
'col1': 'float',
|
||||||
|
'col2': 'int',
|
||||||
|
'col3': 'float',
|
||||||
|
}
|
||||||
|
|
||||||
|
self.tag_dict = {
|
||||||
|
't1': 'int',
|
||||||
|
't2': f'binary({self.binary_length})'
|
||||||
|
}
|
||||||
|
|
||||||
|
self.tag_list = [
|
||||||
|
f'1, "Beijing"',
|
||||||
|
f'2, "Shanghai"',
|
||||||
|
f'3, "Guangzhou"',
|
||||||
|
f'4, "Shenzhen"'
|
||||||
|
]
|
||||||
|
|
||||||
|
self.values_list = [
|
||||||
|
f'now, 9.1, 200, 0.3'
|
||||||
|
]
|
||||||
|
|
||||||
|
self.tbnum = 4
|
||||||
|
self.topic_name = 'topic1'
|
||||||
|
|
||||||
|
|
||||||
|
def prepare_data(self):
|
||||||
|
for db in self.dbnames:
|
||||||
|
tdSql.execute(f"create database {db}")
|
||||||
|
tdSql.execute(f"use {db}")
|
||||||
|
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
|
||||||
|
for i in range(self.tbnum):
|
||||||
|
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
|
||||||
|
for j in self.values_list:
|
||||||
|
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
|
||||||
|
|
||||||
|
def consumeTest(self):
|
||||||
|
consumer_dict = {
|
||||||
|
"group.id": "g1",
|
||||||
|
"td.connect.user": self.user_name,
|
||||||
|
"td.connect.pass": "test",
|
||||||
|
"auto.offset.reset": "earliest"
|
||||||
|
}
|
||||||
|
consumer = Consumer(consumer_dict)
|
||||||
|
|
||||||
|
tdLog.debug("test subscribe topic created by other user")
|
||||||
|
exceptOccured = False
|
||||||
|
try:
|
||||||
|
consumer.subscribe([self.topic_name])
|
||||||
|
except TmqError:
|
||||||
|
exceptOccured = True
|
||||||
|
|
||||||
|
if not exceptOccured:
|
||||||
|
tdLog.exit(f"has no privilege, should except")
|
||||||
|
|
||||||
|
tdLog.debug("test subscribe topic privilege granted by other user")
|
||||||
|
tdSql.execute(f'grant subscribe on {self.topic_name} to {self.user_name}')
|
||||||
|
|
||||||
|
exceptOccured = False
|
||||||
|
try:
|
||||||
|
consumer.subscribe([self.topic_name])
|
||||||
|
except TmqError:
|
||||||
|
exceptOccured = True
|
||||||
|
|
||||||
|
if exceptOccured:
|
||||||
|
tdLog.exit(f"has privilege, should not except")
|
||||||
|
|
||||||
|
cnt = 0
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
res = consumer.poll(1)
|
||||||
|
cnt += 1
|
||||||
|
if cnt == 1:
|
||||||
|
if not res:
|
||||||
|
tdLog.exit(f"grant privilege, should get res")
|
||||||
|
elif cnt == 2:
|
||||||
|
if res:
|
||||||
|
time.sleep(1000)
|
||||||
|
tdLog.exit(f"revoke privilege, should get NULL")
|
||||||
|
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
tdLog.debug("test subscribe topic privilege revoked by other user")
|
||||||
|
tdSql.execute(f'revoke subscribe on {self.topic_name} from {self.user_name}')
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
consumer.close()
|
||||||
|
time.sleep(1000)
|
||||||
|
|
||||||
|
def create_user(self):
|
||||||
|
tdSql.execute(f'create topic {self.topic_name} as database {self.dbnames[0]}')
|
||||||
|
tdSql.execute(f'create user {self.user_name} pass "test"')
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.prepare_data()
|
||||||
|
self.create_user()
|
||||||
|
self.consumeTest()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue