diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index ed2ec0b6da..db727907b7 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG d5df76d + GIT_TAG 823fae5 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 70e1d73c7e..a84fa3c9f0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -120,6 +120,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_APPS, TSDB_MGMT_TABLE_STREAM_TASKS, + TSDB_MGMT_TABLE_PRIVILEGES, TSDB_MGMT_TABLE_MAX, } EShowType; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index d811eb7fec..719a6f82d2 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -139,6 +139,8 @@ typedef struct { int32_t numOfSTables; int32_t numOfCTables; int32_t numOfCreateSTables; + int32_t numOfAlterColSTables; + int32_t numOfAlterTagSTables; int64_t parseTime; int64_t schemaTime; @@ -512,6 +514,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { goto end; } + info->cost.numOfAlterTagSTables++; taosMemoryFreeClear(pTableMeta); code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1); if (code != TSDB_CODE_SUCCESS) { @@ -559,6 +562,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { goto end; } + info->cost.numOfAlterColSTables++; + taosMemoryFreeClear(pTableMeta); code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1); if (code != TSDB_CODE_SUCCESS) { goto end; @@ -820,11 +825,6 @@ static int8_t smlGetTsTypeByPrecision(int8_t precision) { } static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) { - void *tmp = taosMemoryCalloc(1, len + 1); - memcpy(tmp, data, len); - uDebug("SML:0x%" PRIx64 " smlParseInfluxTime tslen:%d, ts:%s", info->id, len, (char*)tmp); - taosMemoryFree(tmp); - if (len == 0 || (len == 1 && data[0] == '0')) { return taosGetTimestampNs(); } @@ -877,7 +877,10 @@ static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArra } uDebug("SML:0x%" PRIx64 " smlParseTS:%" PRId64, info->id, ts); - if (ts == -1) return TSDB_CODE_INVALID_TIMESTAMP; + if (ts <= 0) { + uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts); + return TSDB_CODE_INVALID_TIMESTAMP; + } // add ts to SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); @@ -2076,10 +2079,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { SSmlLineInfo elements = {0}; - void *tmp = taosMemoryCalloc(1, len + 1); - memcpy(tmp, sql, len); - uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? (char*)tmp : sql)); - taosMemoryFree(tmp); + uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : sql)); int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { @@ -2372,11 +2372,12 @@ static int32_t smlInsertData(SSmlHandle *info) { static void smlPrintStatisticInfo(SSmlHandle *info) { uError("SML:0x%" PRIx64 - " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \ + " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \ parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64 "", info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, - info->cost.numOfCreateSTables, info->cost.schemaTime - info->cost.parseTime, + info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables, + info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime); } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index ceb452c551..16fe6c1b91 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -196,7 +196,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 743beb7f82..e15d7ac3df 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -464,7 +464,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index d0d2930abe..7ecf60dc2d 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -71,6 +71,9 @@ typedef enum { MND_OPER_READ_DB, MND_OPER_READ_OR_WRITE_DB, MND_OPER_SHOW_VARIBALES, + MND_OPER_SUBSCRIBE, + MND_OPER_CREATE_TOPIC, + MND_OPER_DROP_TOPIC, } EOperType; typedef enum { @@ -273,6 +276,7 @@ typedef struct { int32_t authVersion; SHashObj* readDbs; SHashObj* writeDbs; + SHashObj* topics; SRWLatch lock; } SUserObj; diff --git a/source/dnode/mnode/impl/inc/mndPrivilege.h b/source/dnode/mnode/impl/inc/mndPrivilege.h index dc88b25f51..dfde2f671e 100644 --- a/source/dnode/mnode/impl/inc/mndPrivilege.h +++ b/source/dnode/mnode/impl/inc/mndPrivilege.h @@ -28,6 +28,8 @@ void mndCleanupPrivilege(SMnode *pMnode); int32_t mndCheckOperPrivilege(SMnode *pMnode, const char *user, EOperType operType); int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType, SDbObj *pDb); int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname); +int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic); +int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName); int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname); int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter); int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp); diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index 9f516904ce..8cd669c769 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -25,7 +25,7 @@ extern "C" { int32_t mndInitTopic(SMnode *pMnode); void mndCleanupTopic(SMnode *pMnode); -SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName); +SMqTopicObj *mndAcquireTopic(SMnode *pMnode, const char *topicName); void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic); SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic); diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index ffa5dc6d0b..ce2ec7b23a 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -147,6 +147,8 @@ _OVER: static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SAcctObj *pAcct = NULL; + SSdbRow *pRow = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -156,10 +158,10 @@ static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) { goto _OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj)); + pRow = sdbAllocRow(sizeof(SAcctObj)); if (pRow == NULL) goto _OVER; - SAcctObj *pAcct = sdbGetRowObj(pRow); + pAcct = sdbGetRowObj(pRow); if (pAcct == NULL) goto _OVER; int32_t dataPos = 0; @@ -186,7 +188,7 @@ static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("acct:%s, failed to decode from raw:%p since %s", pAcct->acct, pRaw, terrstr()); + mError("acct:%s, failed to decode from raw:%p since %s", pAcct == NULL ? "null" : pAcct->acct, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 348c8f4cb8..ca03207d2b 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -157,6 +157,8 @@ _OVER: static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SClusterObj *pCluster = NULL; + SSdbRow *pRow = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -166,10 +168,10 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { goto _OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj)); + pRow = sdbAllocRow(sizeof(SClusterObj)); if (pRow == NULL) goto _OVER; - SClusterObj *pCluster = sdbGetRowObj(pRow); + pCluster = sdbGetRowObj(pRow); if (pCluster == NULL) goto _OVER; int32_t dataPos = 0; @@ -184,7 +186,8 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr()); + mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster == NULL ? 0 : pCluster->id, pRaw, + terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 4397ea0751..76bff9a70f 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -712,7 +712,9 @@ CM_ENCODE_OVER: SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; - void *buf = NULL; + SSdbRow *pRow = NULL; + SMqConsumerObj *pConsumer = NULL; + void *buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER; @@ -722,10 +724,10 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { goto CM_DECODE_OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj)); + pRow = sdbAllocRow(sizeof(SMqConsumerObj)); if (pRow == NULL) goto CM_DECODE_OVER; - SMqConsumerObj *pConsumer = sdbGetRowObj(pRow); + pConsumer = sdbGetRowObj(pRow); if (pConsumer == NULL) goto CM_DECODE_OVER; int32_t dataPos = 0; @@ -745,7 +747,8 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { CM_DECODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { - mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); + mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer == NULL ? 0 : pConsumer->consumerId, + pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c5d5687ade..5fa86dffe2 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -147,6 +147,8 @@ _OVER: static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SDbObj *pDb = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -156,10 +158,10 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { goto _OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SDbObj)); + pRow = sdbAllocRow(sizeof(SDbObj)); if (pRow == NULL) goto _OVER; - SDbObj *pDb = sdbGetRowObj(pRow); + pDb = sdbGetRowObj(pRow); if (pDb == NULL) goto _OVER; int32_t dataPos = 0; @@ -232,7 +234,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("db:%s, failed to decode from raw:%p since %s", pDb->name, pRaw, terrstr()); + mError("db:%s, failed to decode from raw:%p since %s", pDb == NULL ? "null" : pDb->name, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 49887c0dab..250a302cc4 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -154,8 +154,9 @@ _OVER: } static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { - SSdbRow *pRow = NULL; terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SDnodeObj *pDnode = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -166,7 +167,8 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { pRow = sdbAllocRow(sizeof(SDnodeObj)); if (pRow == NULL) goto _OVER; - SDnodeObj *pDnode = sdbGetRowObj(pRow); + + pDnode = sdbGetRowObj(pRow); if (pDnode == NULL) goto _OVER; int32_t dataPos = 0; @@ -181,7 +183,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("dnode:%d, failed to decode from raw:%p since %s", pDnode->id, pRaw, terrstr()); + mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index c9b22fad3a..31f31a15ba 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -101,6 +101,8 @@ _OVER: static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SFuncObj *pFunc = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -110,10 +112,10 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { goto _OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SFuncObj)); + pRow = sdbAllocRow(sizeof(SFuncObj)); if (pRow == NULL) goto _OVER; - SFuncObj *pFunc = sdbGetRowObj(pRow); + pFunc = sdbGetRowObj(pRow); if (pFunc == NULL) goto _OVER; int32_t dataPos = 0; @@ -148,7 +150,7 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("func:%s, failed to decode from raw:%p since %s", pFunc->name, pRaw, terrstr()); + mError("func:%s, failed to decode from raw:%p since %s", pFunc == NULL ? "null" : pFunc->name, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 7c86a5be22..2620100b6c 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -142,6 +142,8 @@ _OVER: static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SMnodeObj *pObj = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; @@ -151,10 +153,10 @@ static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) { goto _OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj)); + pRow = sdbAllocRow(sizeof(SMnodeObj)); if (pRow == NULL) goto _OVER; - SMnodeObj *pObj = sdbGetRowObj(pRow); + pObj = sdbGetRowObj(pRow); if (pObj == NULL) goto _OVER; int32_t dataPos = 0; @@ -167,7 +169,7 @@ static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("mnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr()); + mError("mnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndPrivilege.c b/source/dnode/mnode/impl/src/mndPrivilege.c index 151a2a6404..ccb4140b83 100644 --- a/source/dnode/mnode/impl/src/mndPrivilege.c +++ b/source/dnode/mnode/impl/src/mndPrivilege.c @@ -28,6 +28,10 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname) { return 0; } +int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; } +int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) { + return 0; +} int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp) { memcpy(pRsp->user, pUser->user, TSDB_USER_LEN); pRsp->superAuth = 1; diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 434e6fbc52..26e7f72cf7 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -100,6 +100,8 @@ _OVER: static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SQnodeObj *pObj = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -109,10 +111,10 @@ static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw) { goto _OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SQnodeObj)); + pRow = sdbAllocRow(sizeof(SQnodeObj)); if (pRow == NULL) goto _OVER; - SQnodeObj *pObj = sdbGetRowObj(pRow); + pObj = sdbGetRowObj(pRow); if (pObj == NULL) goto _OVER; int32_t dataPos = 0; @@ -125,7 +127,7 @@ static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("qnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr()); + mError("qnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 698c07d9bc..7be2d48c70 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -132,6 +132,8 @@ _OVER: static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SSmaObj *pSma = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -141,10 +143,10 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) { goto _OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SSmaObj)); + pRow = sdbAllocRow(sizeof(SSmaObj)); if (pRow == NULL) goto _OVER; - SSmaObj *pSma = sdbGetRowObj(pRow); + pSma = sdbGetRowObj(pRow); if (pSma == NULL) goto _OVER; int32_t dataPos = 0; @@ -200,7 +202,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr()); + mError("sma:%s, failed to decode from raw:%p since %s", pSma == NULL ? "null" : pSma->name, pRaw, terrstr()); taosMemoryFreeClear(pSma->expr); taosMemoryFreeClear(pSma->tagsFilter); taosMemoryFreeClear(pSma->sql); diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 7753e5b127..a31801f376 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -105,6 +105,8 @@ _OVER: static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SSnodeObj *pObj = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -114,10 +116,10 @@ static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) { goto _OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SSnodeObj)); + pRow = sdbAllocRow(sizeof(SSnodeObj)); if (pRow == NULL) goto _OVER; - SSnodeObj *pObj = sdbGetRowObj(pRow); + pObj = sdbGetRowObj(pRow); if (pObj == NULL) goto _OVER; int32_t dataPos = 0; @@ -130,7 +132,7 @@ static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("snode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr()); + mError("snode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 6dad7d74c8..2ed4fde509 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -162,6 +162,8 @@ _OVER: static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SStbObj *pStb = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -171,10 +173,10 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { goto _OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SStbObj)); + pRow = sdbAllocRow(sizeof(SStbObj)); if (pRow == NULL) goto _OVER; - SStbObj *pStb = sdbGetRowObj(pRow); + pStb = sdbGetRowObj(pRow); if (pStb == NULL) goto _OVER; int32_t dataPos = 0; @@ -254,10 +256,12 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("stb:%s, failed to decode from raw:%p since %s", pStb->name, pRaw, terrstr()); - taosMemoryFreeClear(pStb->pColumns); - taosMemoryFreeClear(pStb->pTags); - taosMemoryFreeClear(pStb->comment); + mError("stb:%s, failed to decode from raw:%p since %s", pStb == NULL ? "null" : pStb->name, pRaw, terrstr()); + if (pStb != NULL) { + taosMemoryFreeClear(pStb->pColumns); + taosMemoryFreeClear(pStb->pTags); + taosMemoryFreeClear(pStb->comment); + } taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d8cf7a837e..44ff8733fd 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -120,7 +120,9 @@ STREAM_ENCODE_OVER: SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; - void *buf = NULL; + SSdbRow *pRow = NULL; + SStreamObj *pStream = NULL; + void *buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER; @@ -130,11 +132,10 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { goto STREAM_DECODE_OVER; } - int32_t size = sizeof(SStreamObj); - SSdbRow *pRow = sdbAllocRow(size); + pRow = sdbAllocRow(sizeof(SStreamObj)); if (pRow == NULL) goto STREAM_DECODE_OVER; - SStreamObj *pStream = sdbGetRowObj(pRow); + pStream = sdbGetRowObj(pRow); if (pStream == NULL) goto STREAM_DECODE_OVER; int32_t tlen; @@ -157,7 +158,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { STREAM_DECODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { - mError("stream:%s, failed to decode from raw:%p since %s", pStream->name, pRaw, terrstr()); + mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ffb46e5f1b..3cf5e17cd3 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -743,7 +743,9 @@ SUB_ENCODE_OVER: static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; - void *buf = NULL; + SSdbRow *pRow = NULL; + SMqSubscribeObj *pSub = NULL; + void *buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER; @@ -753,11 +755,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { goto SUB_DECODE_OVER; } - int32_t size = sizeof(SMqSubscribeObj); - SSdbRow *pRow = sdbAllocRow(size); + pRow = sdbAllocRow(sizeof(SMqSubscribeObj)); if (pRow == NULL) goto SUB_DECODE_OVER; - SMqSubscribeObj *pSub = sdbGetRowObj(pRow); + pSub = sdbGetRowObj(pRow); if (pSub == NULL) goto SUB_DECODE_OVER; int32_t dataPos = 0; @@ -777,7 +778,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { SUB_DECODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { - mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); + mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 6412761f0b..eea74c6dd8 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -152,8 +152,10 @@ TOPIC_ENCODE_OVER: SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SMqTopicObj *pTopic = NULL; + void *buf = NULL; - void *buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER; @@ -162,11 +164,10 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { goto TOPIC_DECODE_OVER; } - int32_t size = sizeof(SMqTopicObj); - SSdbRow *pRow = sdbAllocRow(size); + pRow = sdbAllocRow(sizeof(SMqTopicObj)); if (pRow == NULL) goto TOPIC_DECODE_OVER; - SMqTopicObj *pTopic = sdbGetRowObj(pRow); + pTopic = sdbGetRowObj(pRow); if (pTopic == NULL) goto TOPIC_DECODE_OVER; int32_t len; @@ -251,7 +252,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { TOPIC_DECODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { - mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr()); + mError("topic:%s, failed to decode from raw:%p since %s", pTopic == NULL ? "null" : pTopic->name, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } @@ -288,7 +289,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic return 0; } -SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) { +SMqTopicObj *mndAcquireTopic(SMnode *pMnode, const char *topicName) { SSdb *pSdb = pMnode->pSdb; SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName); if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { @@ -573,7 +574,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { goto _OVER; } - if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pDb) != 0) { + if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC) != 0) { goto _OVER; } @@ -633,6 +634,11 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } } + if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC) != 0) { + mndReleaseTopic(pMnode, pTopic); + return -1; + } + void *pIter = NULL; SMqConsumerObj *pConsumer; while (1) { diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 2f50ab04b8..5f34adce77 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -18,10 +18,11 @@ #include "mndDb.h" #include "mndPrivilege.h" #include "mndShow.h" +#include "mndTopic.h" #include "mndTrans.h" #include "tbase64.h" -#define USER_VER_NUMBER 1 +#define USER_VER_NUMBER 2 #define USER_RESERVE_SIZE 64 static int32_t mndCreateDefaultUsers(SMnode *pMnode); @@ -36,6 +37,8 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq); static int32_t mndProcessGetUserAuthReq(SRpcMsg *pReq); static int32_t mndRetrieveUsers(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextUser(SMnode *pMnode, void *pIter); +static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter); int32_t mndInitUser(SMnode *pMnode) { SSdbTable table = { @@ -56,6 +59,8 @@ int32_t mndInitUser(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_USER, mndRetrieveUsers); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_USER, mndCancelGetNextUser); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_PRIVILEGES, mndRetrievePrivileges); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_PRIVILEGES, mndCancelGetNextPrivileges); return sdbSetTable(pMnode->pSdb, table); } @@ -119,7 +124,9 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs); int32_t numOfWriteDbs = taosHashGetSize(pUser->writeDbs); - int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN; + int32_t numOfTopics = taosHashGetSize(pUser->topics); + int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN + + numOfTopics * TSDB_TOPIC_FNAME_LEN; SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, USER_VER_NUMBER, size); if (pRaw == NULL) goto _OVER; @@ -137,6 +144,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { SDB_SET_INT32(pRaw, dataPos, pUser->authVersion, _OVER) SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, _OVER) SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, _OVER) + SDB_SET_INT32(pRaw, dataPos, numOfTopics, _OVER) char *db = taosHashIterate(pUser->readDbs, NULL); while (db != NULL) { @@ -150,6 +158,12 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { db = taosHashIterate(pUser->writeDbs, db); } + char *topic = taosHashIterate(pUser->topics, NULL); + while (topic != NULL) { + SDB_SET_BINARY(pRaw, dataPos, topic, TSDB_TOPIC_FNAME_LEN, _OVER); + db = taosHashIterate(pUser->topics, topic); + } + SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) @@ -168,19 +182,21 @@ _OVER: static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SUserObj *pUser = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != USER_VER_NUMBER) { + if (sver != 1 && sver != 2) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; goto _OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj)); + pRow = sdbAllocRow(sizeof(SUserObj)); if (pRow == NULL) goto _OVER; - SUserObj *pUser = sdbGetRowObj(pRow); + pUser = sdbGetRowObj(pRow); if (pUser == NULL) goto _OVER; int32_t dataPos = 0; @@ -197,12 +213,18 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { int32_t numOfReadDbs = 0; int32_t numOfWriteDbs = 0; + int32_t numOfTopics = 0; SDB_GET_INT32(pRaw, dataPos, &numOfReadDbs, _OVER) SDB_GET_INT32(pRaw, dataPos, &numOfWriteDbs, _OVER) + if (sver >= 2) { + SDB_GET_INT32(pRaw, dataPos, &numOfTopics, _OVER) + } + pUser->readDbs = taosHashInit(numOfReadDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); pUser->writeDbs = taosHashInit(numOfWriteDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (pUser->readDbs == NULL || pUser->writeDbs == NULL) goto _OVER; + pUser->topics = taosHashInit(numOfTopics, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (pUser->readDbs == NULL || pUser->writeDbs == NULL || pUser->topics == NULL) goto _OVER; for (int32_t i = 0; i < numOfReadDbs; ++i) { char db[TSDB_DB_FNAME_LEN] = {0}; @@ -218,6 +240,15 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { taosHashPut(pUser->writeDbs, db, len, db, TSDB_DB_FNAME_LEN); } + if (sver >= 2) { + for (int32_t i = 0; i < numOfTopics; ++i) { + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + SDB_GET_BINARY(pRaw, dataPos, topic, TSDB_TOPIC_FNAME_LEN, _OVER) + int32_t len = strlen(topic) + 1; + taosHashPut(pUser->topics, topic, len, topic, TSDB_TOPIC_FNAME_LEN); + } + } + SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) taosInitRWLatch(&pUser->lock); @@ -225,9 +256,12 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("user:%s, failed to decode from raw:%p since %s", pUser->user, pRaw, terrstr()); - taosHashCleanup(pUser->readDbs); - taosHashCleanup(pUser->writeDbs); + mError("user:%s, failed to decode from raw:%p since %s", pUser == NULL ? "null" : pUser->user, pRaw, terrstr()); + if (pUser != NULL) { + taosHashCleanup(pUser->readDbs); + taosHashCleanup(pUser->writeDbs); + taosHashCleanup(pUser->topics); + } taosMemoryFreeClear(pRow); return NULL; } @@ -255,8 +289,10 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { mTrace("user:%s, perform delete action, row:%p", pUser->user, pUser); taosHashCleanup(pUser->readDbs); taosHashCleanup(pUser->writeDbs); + taosHashCleanup(pUser->topics); pUser->readDbs = NULL; pUser->writeDbs = NULL; + pUser->topics = NULL; return 0; } @@ -270,6 +306,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); TSWAP(pOld->readDbs, pNew->readDbs); TSWAP(pOld->writeDbs, pNew->writeDbs); + TSWAP(pOld->topics, pNew->topics); taosWUnLockLatch(&pOld->lock); return 0; @@ -482,9 +519,10 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { taosRLockLatch(&pUser->lock); newUser.readDbs = mndDupDbHash(pUser->readDbs); newUser.writeDbs = mndDupDbHash(pUser->writeDbs); + newUser.topics = mndDupDbHash(pUser->topics); taosRUnLockLatch(&pUser->lock); - if (newUser.readDbs == NULL || newUser.writeDbs == NULL) { + if (newUser.readDbs == NULL || newUser.writeDbs == NULL || newUser.topics == NULL) { goto _OVER; } @@ -582,6 +620,26 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { } } + if (alterReq.alterType == TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC) { + int32_t len = strlen(alterReq.objname) + 1; + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, alterReq.objname); + if (pTopic == NULL) { + mndReleaseTopic(pMnode, pTopic); + goto _OVER; + } + taosHashPut(newUser.topics, pTopic->name, len, pTopic->name, TSDB_TOPIC_FNAME_LEN); + } + + if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC) { + int32_t len = strlen(alterReq.objname) + 1; + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, alterReq.objname); + if (pTopic == NULL) { + mndReleaseTopic(pMnode, pTopic); + goto _OVER; + } + taosHashRemove(newUser.topics, alterReq.objname, len); + } + code = mndAlterUser(pMnode, pUser, &newUser, pReq); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; @@ -594,6 +652,7 @@ _OVER: mndReleaseUser(pMnode, pUser); taosHashCleanup(newUser.writeDbs); taosHashCleanup(newUser.readDbs); + taosHashCleanup(newUser.topics); return code; } @@ -756,6 +815,108 @@ static void mndCancelGetNextUser(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } +static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SUserObj *pUser = NULL; + int32_t cols = 0; + char *pWrite; + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_USER, pShow->pIter, (void **)&pUser); + if (pShow->pIter == NULL) break; + + int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs); + int32_t numOfWriteDbs = taosHashGetSize(pUser->writeDbs); + int32_t numOfTopics = taosHashGetSize(pUser->topics); + if (numOfRows + numOfReadDbs + numOfWriteDbs + numOfTopics >= rows) break; + + char *db = taosHashIterate(pUser->readDbs, NULL); + while (db != NULL) { + cols = 0; + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes); + colDataAppend(pColInfo, numOfRows, (const char *)userName, false); + + char privilege[20] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(privilege, "read", pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)privilege, false); + + SName name = {0}; + char objName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + tNameFromString(&name, db, T_NAME_ACCT | T_NAME_DB); + tNameGetDbName(&name, varDataVal(objName)); + varDataSetLen(objName, strlen(varDataVal(objName))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)objName, false); + + numOfRows++; + db = taosHashIterate(pUser->readDbs, db); + } + + db = taosHashIterate(pUser->writeDbs, NULL); + while (db != NULL) { + cols = 0; + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes); + colDataAppend(pColInfo, numOfRows, (const char *)userName, false); + + char privilege[20] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(privilege, "write", pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)privilege, false); + + SName name = {0}; + char objName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + tNameFromString(&name, db, T_NAME_ACCT | T_NAME_DB); + tNameGetDbName(&name, varDataVal(objName)); + varDataSetLen(objName, strlen(varDataVal(objName))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)objName, false); + + numOfRows++; + db = taosHashIterate(pUser->writeDbs, db); + } + + char *topic = taosHashIterate(pUser->topics, NULL); + while (topic != NULL) { + cols = 0; + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes); + colDataAppend(pColInfo, numOfRows, (const char *)userName, false); + + char privilege[20] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(privilege, "subscribe", pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)privilege, false); + + char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0}; + tstrncpy(varDataVal(topicName), mndGetDbStr(topic), TSDB_TOPIC_NAME_LEN - 2); + varDataSetLen(topicName, strlen(varDataVal(topicName))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)topicName, false); + + numOfRows++; + db = taosHashIterate(pUser->topics, topic); + } + + sdbRelease(pSdb, pUser); + } + + pShow->numOfRows += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} + int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen) { SUserAuthBatchRsp batchRsp = {0}; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index eec112d225..544215a905 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -113,6 +113,8 @@ _OVER: SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SVgObj *pVgroup = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -122,10 +124,10 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { goto _OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj)); + pRow = sdbAllocRow(sizeof(SVgObj)); if (pRow == NULL) goto _OVER; - SVgObj *pVgroup = sdbGetRowObj(pRow); + pVgroup = sdbGetRowObj(pRow); if (pVgroup == NULL) goto _OVER; int32_t dataPos = 0; @@ -152,7 +154,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { _OVER: if (terrno != 0) { - mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup->vgId, pRaw, terrstr()); + mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr()); taosMemoryFreeClear(pRow); return NULL; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 6d8de0b79b..6c5c1718b2 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -418,7 +418,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py -#,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py diff --git a/tests/script/tsim/user/privilege_db.sim b/tests/script/tsim/user/privilege_db.sim index 83933e0e47..79e3754c2f 100644 --- a/tests/script/tsim/user/privilege_db.sim +++ b/tests/script/tsim/user/privilege_db.sim @@ -5,6 +5,10 @@ sql connect print =============== create db sql create database d1 vgroups 1; +sql use d1 +sql create table stb (ts timestamp, i int) tags (j int) +sql create topic topic_1 as select ts, i from stb + sql create database d2 vgroups 1; sql create database d3 vgroups 1; sql select * from information_schema.ins_databases @@ -89,5 +93,6 @@ sql_error drop database d2; sql_error create stable d1.st (ts timestamp, i int) tags (j int) sql create stable d2.st (ts timestamp, i int) tags (j int) +sql_error create topic topic_2 as select ts, i from stb system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 7f52c754cf..cd4acae438 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -62,7 +62,7 @@ class TDTestCase: # os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so.1 {self.getBuildPath()}/build/lib/libtaos.so.1_bak ") packagePath = "/usr/local/src/" - + dataPath = cPath + "/../data/" packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz" packageTPath = packageName.split("-Linux-")[0] my_file = Path(f"{packagePath}/{packageName}") @@ -71,22 +71,21 @@ class TDTestCase: os.system(f"cd {packagePath} && wget https://www.tdengine.com/assets-download/3.0/{packageName}") else: print(f"{packageName} has been exists") - os.system(f"cd {packagePath} && tar xvf {packageName} && cd {packageTPath} && ./install.sh -e no " ) + os.system(f" cd {packagePath} && tar xvf {packageName} && cd {packageTPath} && ./install.sh -e no " ) tdDnodes.stop(1) - print(f"start taosd: nohup taosd -c {cPath} & ") - os.system(f" nohup taosd -c {cPath} & " ) + print(f"start taosd: rm -rf {dataPath}/* && nohup taosd -c {cPath} & ") + os.system(f"rm -rf {dataPath}/* && nohup taosd -c {cPath} & " ) sleep(5) - def buildTaosd(self,bPath): # os.system(f"mv {bPath}/build_bak {bPath}/build ") os.system(f" cd {bPath} && make install ") def run(self): - bPath=self.getBuildPath() - cPath=self.getCfgPath() + bPath = self.getBuildPath() + cPath = self.getCfgPath() dbname = "test" stb = f"{dbname}.meters" self.installTaosd(bPath,cPath) @@ -94,6 +93,7 @@ class TDTestCase: tableNumbers=100 recordNumbers1=100 recordNumbers2=1000 + # tdsqlF=tdCom.newTdSql() # print(tdsqlF) # tdsqlF.query(f"SELECT SERVER_VERSION();") @@ -105,6 +105,7 @@ class TDTestCase: # oldClientVersion=tdsqlF.queryResult[0][0] # tdLog.info(f"Base client version is {oldClientVersion}") # baseVersion = "3.0.1.8" + tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}") tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")