diff --git a/include/common/systable.h b/include/common/systable.h index 506bdcfa8a..e36beb13f2 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -45,7 +45,6 @@ extern "C" { #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFS_TABLE_SMAS "smas" -#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" #define TSDB_PERFS_TABLE_CONNECTIONS "connections" #define TSDB_PERFS_TABLE_QUERIES "queries" #define TSDB_PERFS_TABLE_TOPICS "topics" diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 72418148b0..9ec80293a8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -99,7 +99,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_VGROUP, TSDB_MGMT_TABLE_TOPICS, TSDB_MGMT_TABLE_CONSUMERS, - TSDB_MGMT_TABLE_SUBSCRIBES, + TSDB_MGMT_TABLE_SUBSCRIPTIONS, TSDB_MGMT_TABLE_TRANS, TSDB_MGMT_TABLE_SMAS, TSDB_MGMT_TABLE_CONFIGS, @@ -131,12 +131,10 @@ typedef enum _mgmt_table { #define TSDB_ALTER_USER_SUPERUSER 0x2 #define TSDB_ALTER_USER_ADD_READ_DB 0x3 #define TSDB_ALTER_USER_REMOVE_READ_DB 0x4 -#define TSDB_ALTER_USER_CLEAR_READ_DB 0x5 -#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6 -#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7 -#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8 -#define TSDB_ALTER_USER_ADD_ALL_DB 0x9 -#define TSDB_ALTER_USER_REMOVE_ALL_DB 0xA +#define TSDB_ALTER_USER_ADD_WRITE_DB 0x5 +#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x6 +#define TSDB_ALTER_USER_ADD_ALL_DB 0x7 +#define TSDB_ALTER_USER_REMOVE_ALL_DB 0x8 #define TSDB_ALTER_USER_PRIVILEGES 0x2 diff --git a/include/os/osFile.h b/include/os/osFile.h index b364d233ef..5ba161270d 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -80,6 +80,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count); void taosFprintfFile(TdFilePtr pFile, const char *format, ...); int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict ptrBuf); +int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf); int32_t taosEOFFile(TdFilePtr pFile); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index e89551bb7d..5ff0282c87 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -274,8 +274,8 @@ static const SSysDbTableSchema consumerSchema[] = { }; static const SSysDbTableSchema subscriptionSchema[] = { - {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, }; diff --git a/source/dnode/mnode/impl/inc/mndAuth.h b/source/dnode/mnode/impl/inc/mndAuth.h index 890879912f..de59a11cd7 100644 --- a/source/dnode/mnode/impl/inc/mndAuth.h +++ b/source/dnode/mnode/impl/inc/mndAuth.h @@ -26,7 +26,7 @@ int32_t mndInitAuth(SMnode *pMnode); void mndCleanupAuth(SMnode *pMnode); int32_t mndCheckCreateUserAuth(SUserObj *pOperUser); -int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, SAlterUserReq *pAlter); +int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter); int32_t mndCheckDropUserAuth(SUserObj *pOperUser); int32_t mndCheckNodeAuth(SUserObj *pOperUser); diff --git a/source/dnode/mnode/impl/src/mndAuth.c b/source/dnode/mnode/impl/src/mndAuth.c index 8e5ec40c47..1d89241dd5 100644 --- a/source/dnode/mnode/impl/src/mndAuth.c +++ b/source/dnode/mnode/impl/src/mndAuth.c @@ -79,14 +79,12 @@ int32_t mndCheckCreateUserAuth(SUserObj *pOperUser) { return -1; } -int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, SAlterUserReq *pAlter) { +int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter) { if (pAlter->alterType == TSDB_ALTER_USER_PASSWD) { if (pOperUser->superUser || strcmp(pUser->user, pOperUser->user) == 0) { return 0; } - } - - if (pAlter->alterType == TSDB_ALTER_USER_SUPERUSER) { + } else if (pAlter->alterType == TSDB_ALTER_USER_SUPERUSER) { if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) { terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; @@ -95,21 +93,12 @@ int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, if (pOperUser->superUser) { return 0; } - } - - if (pAlter->alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB || pAlter->alterType == TSDB_ALTER_USER_CLEAR_READ_DB) { + } else { if (pOperUser->superUser) { return 0; } } - if (pAlter->alterType == TSDB_ALTER_USER_ADD_READ_DB || pAlter->alterType == TSDB_ALTER_USER_REMOVE_READ_DB || - pAlter->alterType == TSDB_ALTER_USER_ADD_WRITE_DB || pAlter->alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) { - if (pOperUser->superUser || strcmp(pUser->user, pDb->createUser) == 0) { - return 0; - } - } - terrno = TSDB_CODE_MND_NO_RIGHTS; return -1; } diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index be333d154a..b44c8c932b 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -85,8 +85,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_VGROUP; } else if (strncasecmp(name, TSDB_PERFS_TABLE_CONSUMERS, len) == 0) { type = TSDB_MGMT_TABLE_CONSUMERS; - } else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIBES, len) == 0) { - type = TSDB_MGMT_TABLE_SUBSCRIBES; + } else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIPTIONS, len) == 0) { + type = TSDB_MGMT_TABLE_SUBSCRIPTIONS; } else if (strncasecmp(name, TSDB_PERFS_TABLE_TRANS, len) == 0) { type = TSDB_MGMT_TABLE_TRANS; } else if (strncasecmp(name, TSDB_PERFS_TABLE_SMAS, len) == 0) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 545caea03d..c947a1913e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -44,6 +44,9 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg); +static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); + static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pRedoRaw = mndSubActionEncode(pSub); if (pRedoRaw == NULL) return -1; @@ -71,6 +74,10 @@ int32_t mndInitSubscribe(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); + + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe); + return sdbSetTable(pMnode->pSdb, table); } @@ -706,3 +713,124 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { END: return code; } + +static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { + SMnode *pMnode = pReq->pNode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SMqSubscribeObj *pSub = NULL; + + while (numOfRows < rowsCapacity) { + pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub); + if (pShow->pIter == NULL) break; + + taosRLockLatch(&pSub->lock); + + if (numOfRows + pSub->vgNum > rowsCapacity) { + blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum); + } + + SMqConsumerEp *pConsumerEp = NULL; + void *pIter = NULL; + while (1) { + pIter = taosHashIterate(pSub->consumerHash, pIter); + if (pIter == NULL) break; + pConsumerEp = (SMqConsumerEp *)pIter; + + int32_t sz = taosArrayGetSize(pConsumerEp->vgs); + for (int32_t j = 0; j < sz; j++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // topic and cgroup + char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + mndSplitSubscribeKey(pSub->key, topic, cgroup); + varDataSetLen(topic, strlen(varDataVal(topic))); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)topic, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); + + // vg id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); + + // consumer id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false); + + // offset +#if 0 + // subscribe time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); + + // rebalance time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); +#endif + + numOfRows++; + } + } + + int32_t sz = taosArrayGetSize(pSub->unassignedVgs); + for (int32_t i = 0; i < sz; i++) { + SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // topic and cgroup + char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + mndSplitSubscribeKey(pSub->key, topic, cgroup); + varDataSetLen(topic, strlen(varDataVal(topic))); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)topic, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); + + // vg id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); + + // consumer id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, NULL, true); + + // offset +#if 0 + // subscribe time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); + + // rebalance time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); +#endif + + numOfRows++; + } + + taosRUnLockLatch(&pSub->lock); + sdbRelease(pSdb, pSub); + } + + pShow->numOfRows += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 63d429df9e..01149f793f 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -35,6 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp); + static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index d0af17ff5c..1706820bdc 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -394,6 +394,8 @@ static SHashObj *mndDupDbHash(SHashObj *pOld) { static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; int32_t code = -1; SUserObj *pUser = NULL; SUserObj *pOperUser = NULL; @@ -429,7 +431,13 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { goto _OVER; } + if (mndCheckAlterUserAuth(pOperUser, pUser, &alterReq) != 0) { + goto _OVER; + } + memcpy(&newUser, pUser, sizeof(SUserObj)); + newUser.authVersion++; + newUser.updateTime = taosGetTimestampMs(); taosRLockLatch(&pUser->lock); newUser.readDbs = mndDupDbHash(pUser->readDbs); @@ -440,63 +448,90 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { goto _OVER; } - int32_t len = strlen(alterReq.dbname) + 1; - SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); - mndReleaseDb(pMnode, pDb); - if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) { char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass); memcpy(newUser.pass, pass, TSDB_PASSWORD_LEN); - } else if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) { - newUser.superUser = alterReq.superUser; - } else if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB) { - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; - } - if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB) { - if (taosHashRemove(newUser.readDbs, alterReq.dbname, len) != 0) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; - } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_READ_DB) { - taosHashClear(newUser.readDbs); - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB) { - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; - } - if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) { - if (taosHashRemove(newUser.writeDbs, alterReq.dbname, len) != 0) { - terrno = TSDB_CODE_MND_DB_NOT_EXIST; - goto _OVER; - } - newUser.authVersion++; - } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB) { - taosHashClear(newUser.writeDbs); - newUser.authVersion++; - } else { - terrno = TSDB_CODE_MND_INVALID_ALTER_OPER; - goto _OVER; } - newUser.updateTime = taosGetTimestampMs(); + if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) { + newUser.superUser = alterReq.superUser; + } - if (mndCheckAlterUserAuth(pOperUser, pUser, pDb, &alterReq) != 0) { - goto _OVER; + if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + } else { + while (1) { + SDbObj *pDb = NULL; + pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb); + if (pIter == NULL) break; + int32_t len = strlen(pDb->name) + 1; + taosHashPut(newUser.readDbs, pDb->name, len, pDb->name, TSDB_DB_FNAME_LEN); + sdbRelease(pSdb, pDb); + } + } + } + + if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + } else { + while (1) { + SDbObj *pDb = NULL; + pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb); + if (pIter == NULL) break; + int32_t len = strlen(pDb->name) + 1; + taosHashPut(newUser.writeDbs, pDb->name, len, pDb->name, TSDB_DB_FNAME_LEN); + sdbRelease(pSdb, pDb); + } + } + } + + if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + taosHashRemove(newUser.readDbs, alterReq.dbname, len); + } else { + taosHashClear(newUser.readDbs); + } + } + + if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_DB) { + if (strcmp(alterReq.dbname, "*") != 0) { + int32_t len = strlen(alterReq.dbname) + 1; + SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname); + if (pDb == NULL) { + mndReleaseDb(pMnode, pDb); + goto _OVER; + } + taosHashRemove(newUser.writeDbs, alterReq.dbname, len); + } else { + taosHashClear(newUser.writeDbs); + } } code = mndAlterUser(pMnode, pUser, &newUser, pReq); diff --git a/source/dnode/mnode/impl/test/user/user.cpp b/source/dnode/mnode/impl/test/user/user.cpp index ee961e9a27..1e03d8ff4a 100644 --- a/source/dnode/mnode/impl/test/user/user.cpp +++ b/source/dnode/mnode/impl/test/user/user.cpp @@ -238,9 +238,10 @@ TEST_F(MndTestUser, 03_Alter_User) { { SAlterUserReq alterReq = {0}; - alterReq.alterType = TSDB_ALTER_USER_CLEAR_WRITE_DB; + alterReq.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB; strcpy(alterReq.user, "u3"); strcpy(alterReq.pass, "1"); + strcpy(alterReq.dbname, "*"); int32_t contLen = tSerializeSAlterUserReq(NULL, 0, &alterReq); void* pReq = rpcMallocCont(contLen); @@ -253,9 +254,10 @@ TEST_F(MndTestUser, 03_Alter_User) { { SAlterUserReq alterReq = {0}; - alterReq.alterType = TSDB_ALTER_USER_CLEAR_READ_DB; + alterReq.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB; strcpy(alterReq.user, "u3"); strcpy(alterReq.pass, "1"); + strcpy(alterReq.dbname, "*"); int32_t contLen = tSerializeSAlterUserReq(NULL, 0, &alterReq); void* pReq = rpcMallocCont(contLen); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 5febb9a14c..5d6cbd2a58 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -65,6 +65,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { newCommitIndex = index; sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld", newCommitIndex, pSyncNode->commitIndex); + + syncEntryDestory(pEntry); break; } else { sTrace( diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 7f8ad150f0..323ef43e25 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -853,12 +853,13 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, taosThreadOnce(&transModuleInit, uvInitEnv); transSrvInst++; - char pipeName[64]; assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0)); #ifdef WINDOWS - snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc\\%p-%lu", taosSafeRand(), GetCurrentProcessId()); + char pipeName[64]; + snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId()); #else - snprintf(pipeName, sizeof(pipeName), ".trans.rpc\\%08X-%lu", taosSafeRand(), taosGetSelfPthreadId()); + char pipeName[PATH_MAX] = {0}; + snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); #endif assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName)); assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); @@ -871,20 +872,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, thrd->pTransInst = shandle; srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); - - // #ifdef WINDOWS - // uv_file fds[2]; - // if (uv_pipe(fds, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE) != 0) { - // #else - // uv_os_sock_t fds[2]; - // if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { - // #endif - // goto End; - // } - // uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); - // uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - - // thrd->fd = fds[0]; thrd->pipe = &(srv->pipe[i][1]); // init read if (false == addHandleToWorkloop(thrd,pipeName)) { diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index ab68c69b8d..4fd672fe3c 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -688,6 +688,16 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) { return getline(ptrBuf, &len, pFile->fp); #endif } +int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf) { + if (pFile == NULL || buf == NULL ) { + return -1; + } + assert(pFile->fp != NULL); + if (fgets(buf, maxSize, pFile->fp) == NULL) { + return -1; + } + return strlen(buf); +} int32_t taosEOFFile(TdFilePtr pFile) { if (pFile == NULL) { return 0; diff --git a/tests/script/sh/copy_udf.sh b/tests/script/sh/copy_udf.sh index e1d9ff53d2..7b5b5f4720 100755 --- a/tests/script/sh/copy_udf.sh +++ b/tests/script/sh/copy_udf.sh @@ -5,7 +5,7 @@ set +e echo "Executing copy_udf.sh" -SCRIPT_DIR=`dirname $0` +SCRIPT_DIR=`pwd` cd $SCRIPT_DIR/../ IN_TDINTERNAL="community" @@ -23,11 +23,12 @@ echo $UDF1_DIR echo $UDF2_DIR UDF_TMP=/tmp/udf +rm -rf $UDF_TMP mkdir $UDF_TMP -rm $UDF_TMP/libudf1.so -rm $UDF_TMP/libudf2.so echo "Copy udf shared library files to $UDF_TMP" -cp $UDF1_DIR $UDF_TMP +cp $UDF1_DIR $UDF_TMP +echo "copy udf1 result: $?" cp $UDF2_DIR $UDF_TMP +echo "copy udf2 result: $?" diff --git a/tests/script/tsim/tmq/consume.sh b/tests/script/tsim/tmq/consume.sh index 28e03d8fb9..3fa71d6edd 100755 --- a/tests/script/tsim/tmq/consume.sh +++ b/tests/script/tsim/tmq/consume.sh @@ -62,11 +62,7 @@ fi TOP_DIR=`pwd` -if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then - BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2,3` -else - BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2` -fi +BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2` declare -x BUILD_DIR=$TOP_DIR/$BIN_DIR diff --git a/tests/script/tsim/user/privilege1.sim b/tests/script/tsim/user/privilege1.sim new file mode 100644 index 0000000000..a7c5d9d13d --- /dev/null +++ b/tests/script/tsim/user/privilege1.sim @@ -0,0 +1,71 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print =============== show users +sql create database d1 vgroups 1; +sql create database d2 vgroups 1; +sql create database d3 vgroups 1; +sql show databases +if $rows != 5 then + return -1 +endi + +print =============== create users +sql create user user1 PASS 'user1' +sql create user user2 PASS 'user2' +sql show users +if $rows != 3 then + return -1 +endi + +print =============== test read +sql_error GRANT read ON d1.* to a; +sql_error GRANT read ON d0.* to user1; + +sql GRANT read ON d1.* to user1; +sql GRANT read ON d2.* to user1; +sql GRANT read ON *.* to user1; + +sql REVOKE read ON d1.* from user1; +sql REVOKE read ON d2.* from user1; +sql REVOKE read ON *.* from user1; + +print =============== test write +sql_error GRANT write ON d1.* to a; +sql_error GRANT write ON d0.* to user1; + +sql GRANT write ON d1.* to user1; +sql GRANT write ON d2.* to user1; +sql GRANT write ON *.* to user1; + +sql REVOKE write ON d1.* from user1; +sql REVOKE write ON d2.* from user1; +sql REVOKE write ON *.* from user1; + +print =============== test all +sql_error GRANT all ON d1.* to a; +sql_error GRANT all ON d0.* to user1; + +sql GRANT all ON d1.* to user1; +sql GRANT all ON d2.* to user1; +sql GRANT all ON *.* to user1; + +sql REVOKE all ON d1.* from user1; +sql REVOKE all ON d2.* from user1; +sql REVOKE all ON *.* from user1; + +print =============== test read write +sql_error GRANT read,write ON d1.* to a; +sql_error GRANT read,write ON d0.* to user1; + +sql GRANT read,write ON d1.* to user1; +sql GRANT read,write ON d2.* to user1; +sql GRANT read,write ON *.* to user1; + +sql REVOKE read,write ON d1.* from user1; +sql REVOKE read,write ON d2.* from user1; +sql REVOKE read,write ON *.* from user1; + +system sh/exec.sh -n dnode1 -s stop -x SIGINT