Merge branch '3.0' of github.com:taosdata/TDengine into feature/udf
This commit is contained in:
commit
3a6a8a96cb
|
@ -83,7 +83,7 @@ int32_t create_stream() {
|
|||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||
pRes = taos_query(
|
||||
pConn,
|
||||
"create stream stream1 trigger window_close into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
|
||||
"create stream stream1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
|
||||
"from tu1 interval(10m)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||
|
|
|
@ -221,15 +221,12 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
|
|||
DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
|
||||
DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
|
||||
|
||||
#if 0
|
||||
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
|
||||
#endif
|
||||
|
||||
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
|
||||
|
||||
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
|
||||
|
||||
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
|
||||
|
||||
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
|
||||
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
|
||||
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
||||
|
@ -240,6 +237,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *
|
|||
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
|
||||
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
|
||||
#endif
|
||||
|
||||
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
|
||||
|
||||
enum tmq_conf_res_t {
|
||||
|
@ -268,12 +266,9 @@ DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
|
|||
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
|
||||
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
|
||||
#endif
|
||||
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
|
||||
#if 0
|
||||
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
|
||||
DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql);
|
||||
#endif
|
||||
|
||||
/* ------------------------------ TMQ END -------------------------------- */
|
||||
|
||||
#if 1 // Shuduo: temporary enable for app build
|
||||
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
|
||||
#endif
|
||||
|
|
|
@ -45,7 +45,6 @@ extern "C" {
|
|||
|
||||
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
|
||||
#define TSDB_PERFS_TABLE_SMAS "smas"
|
||||
#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes"
|
||||
#define TSDB_PERFS_TABLE_CONNECTIONS "connections"
|
||||
#define TSDB_PERFS_TABLE_QUERIES "queries"
|
||||
#define TSDB_PERFS_TABLE_TOPICS "topics"
|
||||
|
|
|
@ -99,7 +99,7 @@ typedef enum _mgmt_table {
|
|||
TSDB_MGMT_TABLE_VGROUP,
|
||||
TSDB_MGMT_TABLE_TOPICS,
|
||||
TSDB_MGMT_TABLE_CONSUMERS,
|
||||
TSDB_MGMT_TABLE_SUBSCRIBES,
|
||||
TSDB_MGMT_TABLE_SUBSCRIPTIONS,
|
||||
TSDB_MGMT_TABLE_TRANS,
|
||||
TSDB_MGMT_TABLE_SMAS,
|
||||
TSDB_MGMT_TABLE_CONFIGS,
|
||||
|
@ -131,12 +131,10 @@ typedef enum _mgmt_table {
|
|||
#define TSDB_ALTER_USER_SUPERUSER 0x2
|
||||
#define TSDB_ALTER_USER_ADD_READ_DB 0x3
|
||||
#define TSDB_ALTER_USER_REMOVE_READ_DB 0x4
|
||||
#define TSDB_ALTER_USER_CLEAR_READ_DB 0x5
|
||||
#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6
|
||||
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7
|
||||
#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8
|
||||
#define TSDB_ALTER_USER_ADD_ALL_DB 0x9
|
||||
#define TSDB_ALTER_USER_REMOVE_ALL_DB 0xA
|
||||
#define TSDB_ALTER_USER_ADD_WRITE_DB 0x5
|
||||
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x6
|
||||
#define TSDB_ALTER_USER_ADD_ALL_DB 0x7
|
||||
#define TSDB_ALTER_USER_REMOVE_ALL_DB 0x8
|
||||
|
||||
#define TSDB_ALTER_USER_PRIVILEGES 0x2
|
||||
|
||||
|
|
|
@ -80,6 +80,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count);
|
|||
void taosFprintfFile(TdFilePtr pFile, const char *format, ...);
|
||||
|
||||
int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict ptrBuf);
|
||||
int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf);
|
||||
|
||||
int32_t taosEOFFile(TdFilePtr pFile);
|
||||
|
||||
|
|
|
@ -86,10 +86,9 @@ extern const int32_t TYPE_BYTES[15];
|
|||
#define TS_PATH_DELIMITER "."
|
||||
#define TS_ESCAPE_CHAR '`'
|
||||
|
||||
|
||||
#define TSDB_TIME_PRECISION_MILLI 0
|
||||
#define TSDB_TIME_PRECISION_MICRO 1
|
||||
#define TSDB_TIME_PRECISION_NANO 2
|
||||
#define TSDB_TIME_PRECISION_MILLI 0
|
||||
#define TSDB_TIME_PRECISION_MICRO 1
|
||||
#define TSDB_TIME_PRECISION_NANO 2
|
||||
#define TSDB_TIME_PRECISION_HOURS 3
|
||||
#define TSDB_TIME_PRECISION_MINUTES 4
|
||||
#define TSDB_TIME_PRECISION_SECONDS 5
|
||||
|
@ -249,7 +248,6 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_SHOW_SQL_LEN 512
|
||||
#define TSDB_SLOW_QUERY_SQL_LEN 512
|
||||
#define TSDB_SHOW_SUBQUERY_LEN 1000
|
||||
#define TSDB_SHOW_LIST_LEN 1000
|
||||
|
||||
#define TSDB_TRANS_STAGE_LEN 12
|
||||
#define TSDB_TRANS_TYPE_LEN 16
|
||||
|
@ -376,9 +374,9 @@ typedef enum ELogicConditionType {
|
|||
* 1. ordinary sub query for select * from super_table
|
||||
* 2. all sqlobj generated by createSubqueryObj with this flag
|
||||
*/
|
||||
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
|
||||
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
|
||||
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
|
||||
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
|
||||
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
|
||||
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
|
||||
|
||||
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
|
||||
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
|
||||
|
|
|
@ -395,7 +395,7 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
|
|||
}
|
||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||
SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
|
||||
tmq_list_append(*topics, topic->topicName);
|
||||
tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
|
||||
}
|
||||
return TMQ_RESP_ERR__SUCCESS;
|
||||
}
|
||||
|
|
|
@ -14,9 +14,9 @@
|
|||
*/
|
||||
|
||||
#include "systable.h"
|
||||
#include "taos.h"
|
||||
#include "tdef.h"
|
||||
#include "types.h"
|
||||
#include "taos.h"
|
||||
|
||||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||
|
@ -265,7 +265,7 @@ static const SSysDbTableSchema consumerSchema[] = {
|
|||
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "topics", .bytes = TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
|
@ -274,8 +274,8 @@ static const SSysDbTableSchema consumerSchema[] = {
|
|||
};
|
||||
|
||||
static const SSysDbTableSchema subscriptionSchema[] = {
|
||||
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||
};
|
||||
|
|
|
@ -26,7 +26,7 @@ int32_t mndInitAuth(SMnode *pMnode);
|
|||
void mndCleanupAuth(SMnode *pMnode);
|
||||
|
||||
int32_t mndCheckCreateUserAuth(SUserObj *pOperUser);
|
||||
int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, SAlterUserReq *pAlter);
|
||||
int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
|
||||
int32_t mndCheckDropUserAuth(SUserObj *pOperUser);
|
||||
|
||||
int32_t mndCheckNodeAuth(SUserObj *pOperUser);
|
||||
|
|
|
@ -33,6 +33,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
|
|||
|
||||
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
|
||||
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -79,14 +79,12 @@ int32_t mndCheckCreateUserAuth(SUserObj *pOperUser) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb, SAlterUserReq *pAlter) {
|
||||
int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter) {
|
||||
if (pAlter->alterType == TSDB_ALTER_USER_PASSWD) {
|
||||
if (pOperUser->superUser || strcmp(pUser->user, pOperUser->user) == 0) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (pAlter->alterType == TSDB_ALTER_USER_SUPERUSER) {
|
||||
} else if (pAlter->alterType == TSDB_ALTER_USER_SUPERUSER) {
|
||||
if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) {
|
||||
terrno = TSDB_CODE_MND_NO_RIGHTS;
|
||||
return -1;
|
||||
|
@ -95,21 +93,12 @@ int32_t mndCheckAlterUserAuth(SUserObj *pOperUser, SUserObj *pUser, SDbObj *pDb,
|
|||
if (pOperUser->superUser) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (pAlter->alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB || pAlter->alterType == TSDB_ALTER_USER_CLEAR_READ_DB) {
|
||||
} else {
|
||||
if (pOperUser->superUser) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (pAlter->alterType == TSDB_ALTER_USER_ADD_READ_DB || pAlter->alterType == TSDB_ALTER_USER_REMOVE_READ_DB ||
|
||||
pAlter->alterType == TSDB_ALTER_USER_ADD_WRITE_DB || pAlter->alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) {
|
||||
if (pOperUser->superUser || strcmp(pUser->user, pDb->createUser) == 0) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_MND_NO_RIGHTS;
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -790,71 +790,83 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
|
||||
if (pShow->pIter == NULL) break;
|
||||
|
||||
SColumnInfoData *pColInfo;
|
||||
int32_t cols = 0;
|
||||
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
|
||||
// consumer id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
|
||||
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||
bool hasTopic = true;
|
||||
if (topicSz == 0) {
|
||||
hasTopic = false;
|
||||
topicSz = 1;
|
||||
}
|
||||
|
||||
// group id
|
||||
char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN);
|
||||
varDataSetLen(groupId, strlen(varDataVal(groupId)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)groupId, false);
|
||||
for (int32_t i = 0; i < topicSz; i++) {
|
||||
if (numOfRows + topicSz > rowsCapacity) {
|
||||
blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
|
||||
}
|
||||
SColumnInfoData *pColInfo;
|
||||
int32_t cols = 0;
|
||||
|
||||
// app id
|
||||
char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN);
|
||||
varDataSetLen(appId, strlen(varDataVal(appId)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)appId, false);
|
||||
// consumer id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
|
||||
|
||||
// status
|
||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20);
|
||||
varDataSetLen(status, strlen(varDataVal(status)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
|
||||
// group id
|
||||
char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN);
|
||||
varDataSetLen(groupId, strlen(varDataVal(groupId)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)groupId, false);
|
||||
|
||||
// subscribed topics
|
||||
// TODO: split into multiple rows
|
||||
char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char *showStr = taosShowStrArray(pConsumer->assignedTopics);
|
||||
tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN);
|
||||
taosMemoryFree(showStr);
|
||||
varDataSetLen(topics, strlen(varDataVal(topics)));
|
||||
// app id
|
||||
char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN);
|
||||
varDataSetLen(appId, strlen(varDataVal(appId)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)appId, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)topics, false);
|
||||
// status
|
||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20);
|
||||
varDataSetLen(status, strlen(varDataVal(status)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
|
||||
|
||||
// pid
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true);
|
||||
// one subscribed topic
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
if (hasTopic) {
|
||||
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
|
||||
tstrncpy(varDataVal(topic), topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
varDataSetLen(topic, strlen(varDataVal(topic)));
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
|
||||
} else {
|
||||
colDataAppend(pColInfo, numOfRows, NULL, true);
|
||||
}
|
||||
|
||||
// end point
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);
|
||||
// pid
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true);
|
||||
|
||||
// up time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
|
||||
// end point
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);
|
||||
|
||||
// subscribe time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
|
||||
// up time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
|
||||
|
||||
// rebalance time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
|
||||
// subscribe time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
|
||||
|
||||
// rebalance time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
|
||||
pShow->numOfRows += numOfRows;
|
||||
|
|
|
@ -85,8 +85,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
|
|||
type = TSDB_MGMT_TABLE_VGROUP;
|
||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_CONSUMERS, len) == 0) {
|
||||
type = TSDB_MGMT_TABLE_CONSUMERS;
|
||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIBES, len) == 0) {
|
||||
type = TSDB_MGMT_TABLE_SUBSCRIBES;
|
||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIPTIONS, len) == 0) {
|
||||
type = TSDB_MGMT_TABLE_SUBSCRIPTIONS;
|
||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_TRANS, len) == 0) {
|
||||
type = TSDB_MGMT_TABLE_TRANS;
|
||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SMAS, len) == 0) {
|
||||
|
|
|
@ -259,7 +259,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
|||
return -1;
|
||||
}
|
||||
|
||||
#if 1
|
||||
#if 0
|
||||
printf("|");
|
||||
for (int i = 0; i < pStream->outputSchema.nCols; i++) {
|
||||
printf(" %15s |", (char *)pStream->outputSchema.pSchema[i].name);
|
||||
|
|
|
@ -44,6 +44,9 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs
|
|||
static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg);
|
||||
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg);
|
||||
|
||||
static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
|
||||
|
||||
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
|
||||
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
|
@ -71,6 +74,10 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
|||
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
|
||||
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
|
@ -706,3 +713,124 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
|||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||
SMnode *pMnode = pReq->pNode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SMqSubscribeObj *pSub = NULL;
|
||||
|
||||
while (numOfRows < rowsCapacity) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
|
||||
if (pShow->pIter == NULL) break;
|
||||
|
||||
taosRLockLatch(&pSub->lock);
|
||||
|
||||
if (numOfRows + pSub->vgNum > rowsCapacity) {
|
||||
blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
|
||||
}
|
||||
|
||||
SMqConsumerEp *pConsumerEp = NULL;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
|
||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
|
||||
SColumnInfoData *pColInfo;
|
||||
int32_t cols = 0;
|
||||
|
||||
// topic and cgroup
|
||||
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
mndSplitSubscribeKey(pSub->key, topic, cgroup);
|
||||
varDataSetLen(topic, strlen(varDataVal(topic)));
|
||||
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
|
||||
|
||||
// vg id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
|
||||
|
||||
// consumer id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
|
||||
|
||||
// offset
|
||||
#if 0
|
||||
// subscribe time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
|
||||
|
||||
// rebalance time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
|
||||
#endif
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
||||
|
||||
SColumnInfoData *pColInfo;
|
||||
int32_t cols = 0;
|
||||
|
||||
// topic and cgroup
|
||||
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
mndSplitSubscribeKey(pSub->key, topic, cgroup);
|
||||
varDataSetLen(topic, strlen(varDataVal(topic)));
|
||||
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
|
||||
|
||||
// vg id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
|
||||
|
||||
// consumer id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, NULL, true);
|
||||
|
||||
// offset
|
||||
#if 0
|
||||
// subscribe time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
|
||||
|
||||
// rebalance time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
|
||||
#endif
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
sdbRelease(pSdb, pSub);
|
||||
}
|
||||
|
||||
pShow->numOfRows += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj
|
|||
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
|
||||
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
|
||||
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
|
||||
|
||||
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
|
||||
|
||||
|
@ -61,6 +62,11 @@ int32_t mndInitTopic(SMnode *pMnode) {
|
|||
|
||||
void mndCleanupTopic(SMnode *pMnode) {}
|
||||
|
||||
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
|
||||
//
|
||||
return strchr(topic, '.') + 1;
|
||||
}
|
||||
|
||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
|
|
|
@ -394,6 +394,8 @@ static SHashObj *mndDupDbHash(SHashObj *pOld) {
|
|||
|
||||
static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pNode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
int32_t code = -1;
|
||||
SUserObj *pUser = NULL;
|
||||
SUserObj *pOperUser = NULL;
|
||||
|
@ -429,7 +431,13 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckAlterUserAuth(pOperUser, pUser, &alterReq) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
memcpy(&newUser, pUser, sizeof(SUserObj));
|
||||
newUser.authVersion++;
|
||||
newUser.updateTime = taosGetTimestampMs();
|
||||
|
||||
taosRLockLatch(&pUser->lock);
|
||||
newUser.readDbs = mndDupDbHash(pUser->readDbs);
|
||||
|
@ -440,63 +448,90 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
int32_t len = strlen(alterReq.dbname) + 1;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
|
||||
if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) {
|
||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||
taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass);
|
||||
memcpy(newUser.pass, pass, TSDB_PASSWORD_LEN);
|
||||
} else if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) {
|
||||
newUser.superUser = alterReq.superUser;
|
||||
} else if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB) {
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
newUser.authVersion++;
|
||||
} else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB) {
|
||||
if (taosHashRemove(newUser.readDbs, alterReq.dbname, len) != 0) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
newUser.authVersion++;
|
||||
} else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_READ_DB) {
|
||||
taosHashClear(newUser.readDbs);
|
||||
newUser.authVersion++;
|
||||
} else if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB) {
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
newUser.authVersion++;
|
||||
} else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) {
|
||||
if (taosHashRemove(newUser.writeDbs, alterReq.dbname, len) != 0) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
newUser.authVersion++;
|
||||
} else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB) {
|
||||
taosHashClear(newUser.writeDbs);
|
||||
newUser.authVersion++;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_INVALID_ALTER_OPER;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
newUser.updateTime = taosGetTimestampMs();
|
||||
if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER) {
|
||||
newUser.superUser = alterReq.superUser;
|
||||
}
|
||||
|
||||
if (mndCheckAlterUserAuth(pOperUser, pUser, pDb, &alterReq) != 0) {
|
||||
goto _OVER;
|
||||
if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) {
|
||||
if (strcmp(alterReq.dbname, "*") != 0) {
|
||||
int32_t len = strlen(alterReq.dbname) + 1;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
|
||||
if (pDb == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
goto _OVER;
|
||||
}
|
||||
if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
goto _OVER;
|
||||
}
|
||||
} else {
|
||||
while (1) {
|
||||
SDbObj *pDb = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb);
|
||||
if (pIter == NULL) break;
|
||||
int32_t len = strlen(pDb->name) + 1;
|
||||
taosHashPut(newUser.readDbs, pDb->name, len, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
sdbRelease(pSdb, pDb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) {
|
||||
if (strcmp(alterReq.dbname, "*") != 0) {
|
||||
int32_t len = strlen(alterReq.dbname) + 1;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
|
||||
if (pDb == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
goto _OVER;
|
||||
}
|
||||
if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
goto _OVER;
|
||||
}
|
||||
} else {
|
||||
while (1) {
|
||||
SDbObj *pDb = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb);
|
||||
if (pIter == NULL) break;
|
||||
int32_t len = strlen(pDb->name) + 1;
|
||||
taosHashPut(newUser.writeDbs, pDb->name, len, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
sdbRelease(pSdb, pDb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_DB) {
|
||||
if (strcmp(alterReq.dbname, "*") != 0) {
|
||||
int32_t len = strlen(alterReq.dbname) + 1;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
|
||||
if (pDb == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
goto _OVER;
|
||||
}
|
||||
taosHashRemove(newUser.readDbs, alterReq.dbname, len);
|
||||
} else {
|
||||
taosHashClear(newUser.readDbs);
|
||||
}
|
||||
}
|
||||
|
||||
if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB || alterReq.alterType == TSDB_ALTER_USER_REMOVE_ALL_DB) {
|
||||
if (strcmp(alterReq.dbname, "*") != 0) {
|
||||
int32_t len = strlen(alterReq.dbname) + 1;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
|
||||
if (pDb == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
goto _OVER;
|
||||
}
|
||||
taosHashRemove(newUser.writeDbs, alterReq.dbname, len);
|
||||
} else {
|
||||
taosHashClear(newUser.writeDbs);
|
||||
}
|
||||
}
|
||||
|
||||
code = mndAlterUser(pMnode, pUser, &newUser, pReq);
|
||||
|
|
|
@ -238,9 +238,10 @@ TEST_F(MndTestUser, 03_Alter_User) {
|
|||
|
||||
{
|
||||
SAlterUserReq alterReq = {0};
|
||||
alterReq.alterType = TSDB_ALTER_USER_CLEAR_WRITE_DB;
|
||||
alterReq.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB;
|
||||
strcpy(alterReq.user, "u3");
|
||||
strcpy(alterReq.pass, "1");
|
||||
strcpy(alterReq.dbname, "*");
|
||||
|
||||
int32_t contLen = tSerializeSAlterUserReq(NULL, 0, &alterReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
|
@ -253,9 +254,10 @@ TEST_F(MndTestUser, 03_Alter_User) {
|
|||
|
||||
{
|
||||
SAlterUserReq alterReq = {0};
|
||||
alterReq.alterType = TSDB_ALTER_USER_CLEAR_READ_DB;
|
||||
alterReq.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB;
|
||||
strcpy(alterReq.user, "u3");
|
||||
strcpy(alterReq.pass, "1");
|
||||
strcpy(alterReq.dbname, "*");
|
||||
|
||||
int32_t contLen = tSerializeSAlterUserReq(NULL, 0, &alterReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
|
|
|
@ -65,6 +65,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
newCommitIndex = index;
|
||||
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld",
|
||||
newCommitIndex, pSyncNode->commitIndex);
|
||||
|
||||
syncEntryDestory(pEntry);
|
||||
break;
|
||||
} else {
|
||||
sTrace(
|
||||
|
|
|
@ -853,12 +853,13 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
taosThreadOnce(&transModuleInit, uvInitEnv);
|
||||
transSrvInst++;
|
||||
|
||||
char pipeName[64];
|
||||
assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0));
|
||||
#ifdef WINDOWS
|
||||
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc\\%p-%lu", taosSafeRand(), GetCurrentProcessId());
|
||||
char pipeName[64];
|
||||
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId());
|
||||
#else
|
||||
snprintf(pipeName, sizeof(pipeName), ".trans.rpc\\%08X-%lu", taosSafeRand(), taosGetSelfPthreadId());
|
||||
char pipeName[PATH_MAX] = {0};
|
||||
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId());
|
||||
#endif
|
||||
assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
|
||||
assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));
|
||||
|
@ -871,20 +872,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
thrd->pTransInst = shandle;
|
||||
|
||||
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
|
||||
|
||||
// #ifdef WINDOWS
|
||||
// uv_file fds[2];
|
||||
// if (uv_pipe(fds, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE) != 0) {
|
||||
// #else
|
||||
// uv_os_sock_t fds[2];
|
||||
// if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
|
||||
// #endif
|
||||
// goto End;
|
||||
// }
|
||||
// uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
|
||||
// uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
|
||||
|
||||
// thrd->fd = fds[0];
|
||||
thrd->pipe = &(srv->pipe[i][1]); // init read
|
||||
|
||||
if (false == addHandleToWorkloop(thrd,pipeName)) {
|
||||
|
|
|
@ -55,15 +55,15 @@ int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
|
|||
}
|
||||
|
||||
static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
|
||||
int code = 0;
|
||||
int ret = 0;
|
||||
|
||||
TdFilePtr pIdxTFile = pRead->pReadIdxTFile;
|
||||
TdFilePtr pLogTFile = pRead->pReadLogTFile;
|
||||
|
||||
// seek position
|
||||
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
|
||||
code = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
|
||||
if (code < 0) {
|
||||
ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
@ -72,14 +72,14 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
|
|||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
// TODO:deserialize
|
||||
|
||||
ASSERT(entry.ver == ver);
|
||||
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
||||
if (code < 0) {
|
||||
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
return code;
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
|
||||
|
@ -108,7 +108,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
|
|||
}
|
||||
|
||||
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
|
||||
int code;
|
||||
SWal *pWal = pRead->pWal;
|
||||
if (ver == pRead->curVersion) {
|
||||
return 0;
|
||||
|
@ -126,16 +125,15 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
|
|||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||
ASSERT(pRet != NULL);
|
||||
if (pRead->curFileFirstVer != pRet->firstVer) {
|
||||
code = walReadChangeFile(pRead, pRet->firstVer);
|
||||
if (code < 0) {
|
||||
if (walReadChangeFile(pRead, pRet->firstVer) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
code = walReadSeekFilePos(pRead, pRet->firstVer, ver);
|
||||
if (code < 0) {
|
||||
if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRead->curVersion = ver;
|
||||
|
||||
return 0;
|
||||
|
@ -246,8 +244,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
|||
int code;
|
||||
// TODO: check wal life
|
||||
if (pRead->curVersion != ver) {
|
||||
code = walReadSeekVer(pRead, ver);
|
||||
if (code < 0) {
|
||||
if (walReadSeekVer(pRead, ver) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -278,8 +275,12 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
|||
pRead->capacity = pRead->pHead->head.bodyLen;
|
||||
}
|
||||
|
||||
if (pRead->pHead->head.bodyLen !=
|
||||
taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) {
|
||||
if ((code = taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
|
||||
pRead->pHead->head.bodyLen) {
|
||||
if (code < 0)
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
else
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -688,6 +688,16 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) {
|
|||
return getline(ptrBuf, &len, pFile->fp);
|
||||
#endif
|
||||
}
|
||||
int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf) {
|
||||
if (pFile == NULL || buf == NULL ) {
|
||||
return -1;
|
||||
}
|
||||
assert(pFile->fp != NULL);
|
||||
if (fgets(buf, maxSize, pFile->fp) == NULL) {
|
||||
return -1;
|
||||
}
|
||||
return strlen(buf);
|
||||
}
|
||||
int32_t taosEOFFile(TdFilePtr pFile) {
|
||||
if (pFile == NULL) {
|
||||
return 0;
|
||||
|
|
|
@ -746,7 +746,8 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
|
|||
return false;
|
||||
}
|
||||
taosCloseSocket(&pSocket);
|
||||
return 0 == taosValidIp(ip) ? true : false;
|
||||
return true;
|
||||
// return 0 == taosValidIp(ip) ? true : false;
|
||||
}
|
||||
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
|
||||
struct sockaddr_in serverAdd;
|
||||
|
|
|
@ -5,7 +5,7 @@ set +e
|
|||
|
||||
echo "Executing copy_udf.sh"
|
||||
|
||||
SCRIPT_DIR=`dirname $0`
|
||||
SCRIPT_DIR=`pwd`
|
||||
cd $SCRIPT_DIR/../
|
||||
|
||||
IN_TDINTERNAL="community"
|
||||
|
@ -23,11 +23,12 @@ echo $UDF1_DIR
|
|||
echo $UDF2_DIR
|
||||
|
||||
UDF_TMP=/tmp/udf
|
||||
rm -rf $UDF_TMP
|
||||
mkdir $UDF_TMP
|
||||
rm $UDF_TMP/libudf1.so
|
||||
rm $UDF_TMP/libudf2.so
|
||||
|
||||
echo "Copy udf shared library files to $UDF_TMP"
|
||||
|
||||
cp $UDF1_DIR $UDF_TMP
|
||||
cp $UDF1_DIR $UDF_TMP
|
||||
echo "copy udf1 result: $?"
|
||||
cp $UDF2_DIR $UDF_TMP
|
||||
echo "copy udf2 result: $?"
|
||||
|
|
|
@ -62,11 +62,7 @@ fi
|
|||
|
||||
TOP_DIR=`pwd`
|
||||
|
||||
if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then
|
||||
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2,3`
|
||||
else
|
||||
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2`
|
||||
fi
|
||||
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2`
|
||||
|
||||
declare -x BUILD_DIR=$TOP_DIR/$BIN_DIR
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ if $rows != 3 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql create stream s1 into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m)
|
||||
sql create stream s1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m)
|
||||
|
||||
sql show stables
|
||||
if $rows != 2 then
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
print =============== show users
|
||||
sql create database d1 vgroups 1;
|
||||
sql create database d2 vgroups 1;
|
||||
sql create database d3 vgroups 1;
|
||||
sql show databases
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== create users
|
||||
sql create user user1 PASS 'user1'
|
||||
sql create user user2 PASS 'user2'
|
||||
sql show users
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== test read
|
||||
sql_error GRANT read ON d1.* to a;
|
||||
sql_error GRANT read ON d0.* to user1;
|
||||
|
||||
sql GRANT read ON d1.* to user1;
|
||||
sql GRANT read ON d2.* to user1;
|
||||
sql GRANT read ON *.* to user1;
|
||||
|
||||
sql REVOKE read ON d1.* from user1;
|
||||
sql REVOKE read ON d2.* from user1;
|
||||
sql REVOKE read ON *.* from user1;
|
||||
|
||||
print =============== test write
|
||||
sql_error GRANT write ON d1.* to a;
|
||||
sql_error GRANT write ON d0.* to user1;
|
||||
|
||||
sql GRANT write ON d1.* to user1;
|
||||
sql GRANT write ON d2.* to user1;
|
||||
sql GRANT write ON *.* to user1;
|
||||
|
||||
sql REVOKE write ON d1.* from user1;
|
||||
sql REVOKE write ON d2.* from user1;
|
||||
sql REVOKE write ON *.* from user1;
|
||||
|
||||
print =============== test all
|
||||
sql_error GRANT all ON d1.* to a;
|
||||
sql_error GRANT all ON d0.* to user1;
|
||||
|
||||
sql GRANT all ON d1.* to user1;
|
||||
sql GRANT all ON d2.* to user1;
|
||||
sql GRANT all ON *.* to user1;
|
||||
|
||||
sql REVOKE all ON d1.* from user1;
|
||||
sql REVOKE all ON d2.* from user1;
|
||||
sql REVOKE all ON *.* from user1;
|
||||
|
||||
print =============== test read write
|
||||
sql_error GRANT read,write ON d1.* to a;
|
||||
sql_error GRANT read,write ON d0.* to user1;
|
||||
|
||||
sql GRANT read,write ON d1.* to user1;
|
||||
sql GRANT read,write ON d2.* to user1;
|
||||
sql GRANT read,write ON *.* to user1;
|
||||
|
||||
sql REVOKE read,write ON d1.* from user1;
|
||||
sql REVOKE read,write ON d2.* from user1;
|
||||
sql REVOKE read,write ON *.* from user1;
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue