Merge pull request #11994 from taosdata/feature/tq
feat(stream): support perf schema
This commit is contained in:
commit
059a5a9159
|
@ -132,6 +132,7 @@ extern const int32_t TYPE_BYTES[15];
|
||||||
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
|
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
|
||||||
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
|
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
|
||||||
#define TSDB_PERFS_TABLE_OFFSETS "offsets"
|
#define TSDB_PERFS_TABLE_OFFSETS "offsets"
|
||||||
|
#define TSDB_PERFS_TABLE_STREAMS "streams"
|
||||||
|
|
||||||
#define TSDB_INDEX_TYPE_SMA "SMA"
|
#define TSDB_INDEX_TYPE_SMA "SMA"
|
||||||
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
|
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
|
||||||
|
|
|
@ -188,7 +188,6 @@ typedef struct SRequestSendRecvBody {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t resType;
|
int8_t resType;
|
||||||
int32_t code;
|
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
SSchemaWrapper schema;
|
SSchemaWrapper schema;
|
||||||
|
|
|
@ -582,8 +582,9 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char sourceDb[TSDB_DB_FNAME_LEN];
|
||||||
char outputSTbName[TSDB_TABLE_FNAME_LEN];
|
char targetDb[TSDB_DB_FNAME_LEN];
|
||||||
|
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
||||||
int64_t createTime;
|
int64_t createTime;
|
||||||
int64_t updateTime;
|
int64_t updateTime;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
|
|
|
@ -812,6 +812,7 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
|
||||||
|
|
||||||
// subscribed topics
|
// subscribed topics
|
||||||
|
// TODO: split into multiple rows
|
||||||
char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0};
|
char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
char *showStr = taosShowStrArray(pConsumer->assignedTopics);
|
char *showStr = taosShowStrArray(pConsumer->assignedTopics);
|
||||||
tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN);
|
tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN);
|
||||||
|
|
|
@ -410,7 +410,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
||||||
int32_t sz = 0;
|
int32_t sz = 0;
|
||||||
/*int32_t outputNameSz = 0;*/
|
/*int32_t outputNameSz = 0;*/
|
||||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||||
|
@ -456,7 +456,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
||||||
|
|
||||||
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
||||||
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||||
if (tDecodeCStrTo(pDecoder, pObj->db) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||||
|
|
|
@ -124,14 +124,6 @@ static const SInfosTableSchema userStbsSchema[] = {
|
||||||
{.name = "table_comment", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "table_comment", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SInfosTableSchema userStreamsSchema[] = {
|
|
||||||
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "dest_table", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
|
||||||
{.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
};
|
|
||||||
|
|
||||||
static const SInfosTableSchema userTblsSchema[] = {
|
static const SInfosTableSchema userTblsSchema[] = {
|
||||||
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
@ -259,7 +251,6 @@ static const SInfosTableMeta infosMeta[] = {
|
||||||
{TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
|
{TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
|
||||||
{TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)},
|
{TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)},
|
||||||
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
|
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
|
||||||
{TSDB_INS_TABLE_USER_STREAMS, userStreamsSchema, tListLen(userStreamsSchema)},
|
|
||||||
{TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)},
|
{TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)},
|
||||||
{TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
|
{TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
|
||||||
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
|
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
|
||||||
|
|
|
@ -76,6 +76,18 @@ static const SPerfsTableSchema offsetSchema[] = {
|
||||||
{.name = "skip_log_cnt", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "skip_log_cnt", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static const SPerfsTableSchema streamSchema[] = {
|
||||||
|
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
|
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
|
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
|
{.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
|
};
|
||||||
|
|
||||||
static const SPerfsTableMeta perfsMeta[] = {
|
static const SPerfsTableMeta perfsMeta[] = {
|
||||||
{TSDB_PERFS_TABLE_CONNECTIONS, connectionsSchema, tListLen(connectionsSchema)},
|
{TSDB_PERFS_TABLE_CONNECTIONS, connectionsSchema, tListLen(connectionsSchema)},
|
||||||
{TSDB_PERFS_TABLE_QUERIES, queriesSchema, tListLen(queriesSchema)},
|
{TSDB_PERFS_TABLE_QUERIES, queriesSchema, tListLen(queriesSchema)},
|
||||||
|
@ -83,6 +95,7 @@ static const SPerfsTableMeta perfsMeta[] = {
|
||||||
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
|
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
|
||||||
{TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)},
|
{TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)},
|
||||||
{TSDB_PERFS_TABLE_OFFSETS, offsetSchema, tListLen(offsetSchema)},
|
{TSDB_PERFS_TABLE_OFFSETS, offsetSchema, tListLen(offsetSchema)},
|
||||||
|
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
|
||||||
};
|
};
|
||||||
|
|
||||||
// connection/application/
|
// connection/application/
|
||||||
|
|
|
@ -382,7 +382,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
||||||
|
|
||||||
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
|
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
|
||||||
SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
|
SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb);
|
||||||
ASSERT(pDb);
|
ASSERT(pDb);
|
||||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||||
sdbRelease(pSdb, pDb);
|
sdbRelease(pSdb, pDb);
|
||||||
|
|
|
@ -40,7 +40,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessMDropSmaReq(SNodeMsg *pReq);
|
static int32_t mndProcessMDropSmaReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessVCreateSmaRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessVCreateSmaRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
|
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitSma(SMnode *pMnode) {
|
int32_t mndInitSma(SMnode *pMnode) {
|
||||||
|
@ -406,7 +406,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
|
||||||
|
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||||
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
streamObj.createTime = taosGetTimestampMs();
|
streamObj.createTime = taosGetTimestampMs();
|
||||||
streamObj.updateTime = streamObj.createTime;
|
streamObj.updateTime = streamObj.createTime;
|
||||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||||
|
@ -686,9 +686,9 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
|
int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SSmaObj *pSma = NULL;
|
SSmaObj *pSma = NULL;
|
||||||
|
|
||||||
pSma = mndAcquireSma(pMnode, indexReq->indexFName);
|
pSma = mndAcquireSma(pMnode, indexReq->indexFName);
|
||||||
if (pSma == NULL) {
|
if (pSma == NULL) {
|
||||||
|
@ -701,13 +701,14 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserI
|
||||||
strcpy(rsp->indexType, TSDB_INDEX_TYPE_SMA);
|
strcpy(rsp->indexType, TSDB_INDEX_TYPE_SMA);
|
||||||
|
|
||||||
SNodeList *pList = NULL;
|
SNodeList *pList = NULL;
|
||||||
int32_t extOffset = 0;
|
int32_t extOffset = 0;
|
||||||
code = nodesStringToList(pSma->expr, &pList);
|
code = nodesStringToList(pSma->expr, &pList);
|
||||||
if (0 == code) {
|
if (0 == code) {
|
||||||
SNode *node = NULL;
|
SNode *node = NULL;
|
||||||
FOREACH(node, pList) {
|
FOREACH(node, pList) {
|
||||||
SFunctionNode *pFunc = (SFunctionNode *)node;
|
SFunctionNode *pFunc = (SFunctionNode *)node;
|
||||||
extOffset += snprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s", (extOffset ? ",":""), pFunc->functionName);
|
extOffset += snprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
|
||||||
|
(extOffset ? "," : ""), pFunc->functionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
*exist = true;
|
*exist = true;
|
||||||
|
@ -718,13 +719,12 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserI
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp) {
|
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp) {
|
||||||
mndTransProcessRsp(pRsp);
|
mndTransProcessRsp(pRsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
|
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
@ -758,8 +758,8 @@ static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlo
|
||||||
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&stbName));
|
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&stbName));
|
||||||
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char*) n, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)n, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
|
||||||
|
|
|
@ -40,7 +40,7 @@ static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp);
|
||||||
/*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/
|
/*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/
|
||||||
static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq);
|
static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq);
|
||||||
static int32_t mndGetStreamMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
static int32_t mndGetStreamMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitStream(SMnode *pMnode) {
|
int32_t mndInitStream(SMnode *pMnode) {
|
||||||
|
@ -58,8 +58,8 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
|
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
||||||
|
|
||||||
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
|
||||||
/*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);*/
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
|
||||||
|
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
@ -294,8 +294,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
||||||
mDebug("stream:%s to create", pCreate->name);
|
mDebug("stream:%s to create", pCreate->name);
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||||
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
tstrncpy(streamObj.outputSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN);
|
tstrncpy(streamObj.targetSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
streamObj.createTime = taosGetTimestampMs();
|
streamObj.createTime = taosGetTimestampMs();
|
||||||
streamObj.updateTime = streamObj.createTime;
|
streamObj.updateTime = streamObj.createTime;
|
||||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||||
|
@ -424,58 +424,55 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
int32_t cols = 0;
|
|
||||||
char *pWrite;
|
|
||||||
char prefix[TSDB_DB_FNAME_LEN] = {0};
|
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
|
|
||||||
if (pDb == NULL) return 0;
|
|
||||||
|
|
||||||
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
|
|
||||||
strcat(prefix, TS_PATH_DELIMITER);
|
|
||||||
int32_t prefixLen = (int32_t)strlen(prefix);
|
|
||||||
|
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rows) {
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
|
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pStream);
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
if (pStream->dbUid != pDb->uid) {
|
SColumnInfoData *pColInfo;
|
||||||
if (strncmp(pStream->name, prefix, prefixLen) != 0) {
|
SName n;
|
||||||
mError("Inconsistent stream data, name:%s, db:%s, dbUid:%" PRIu64, pStream->name, pDb->name, pDb->uid);
|
int32_t cols = 0;
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pStream);
|
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
continue;
|
tNameFromString(&n, pStream->name, T_NAME_ACCT | T_NAME_DB);
|
||||||
}
|
tNameGetDbName(&n, varDataVal(streamName));
|
||||||
|
varDataSetLen(streamName, strlen(varDataVal(streamName)));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)streamName, false);
|
||||||
|
|
||||||
cols = 0;
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
|
||||||
|
|
||||||
char streamName[TSDB_TABLE_NAME_LEN] = {0};
|
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
tstrncpy(streamName, pStream->name + prefixLen, TSDB_TABLE_NAME_LEN);
|
tstrncpy(&sql[VARSTR_HEADER_SIZE], pStream->sql, TSDB_SHOW_SQL_LEN);
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
|
||||||
STR_TO_VARSTR(pWrite, streamName);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
cols++;
|
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
*(int64_t *)pWrite = pStream->createTime;
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->status, true);
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pStream->sql, pShow->bytes[cols]);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->sourceDb, true);
|
||||||
cols++;
|
|
||||||
|
|
||||||
numOfRows++;
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
sdbRelease(pSdb, pStream);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetDb, true);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetSTbName, true);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->waterMark, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false);
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
pShow->numOfRows += numOfRows;
|
|
||||||
return numOfRows;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
|
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
|
||||||
|
|
|
@ -309,9 +309,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
newConsumerEp.consumerId = consumerId;
|
newConsumerEp.consumerId = consumerId;
|
||||||
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
||||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
||||||
/*SMqConsumer* pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/
|
|
||||||
/*ASSERT(pTestNew->consumerId == consumerId);*/
|
|
||||||
/*ASSERT(pTestNew->vgs == newConsumerEp.vgs);*/
|
|
||||||
taosArrayPush(pOutput->newConsumers, &consumerId);
|
taosArrayPush(pOutput->newConsumers, &consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -369,7 +366,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 8. generate logs
|
// 8. TODO generate logs
|
||||||
|
mInfo("rebalance calculation completed, rebalanced vg:");
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
|
||||||
|
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
|
||||||
|
mInfo("vg: %d moved from consumer %ld to consumer %ld", pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId,
|
||||||
|
pOutputRebVg->newConsumerId);
|
||||||
|
}
|
||||||
|
|
||||||
// 9. clear
|
// 9. clear
|
||||||
taosHashCleanup(pHash);
|
taosHashCleanup(pHash);
|
||||||
|
@ -447,7 +450,9 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
|
||||||
goto REB_FAIL;
|
goto REB_FAIL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 4. commit log: modification log
|
// 4. TODO commit log: modification log
|
||||||
|
|
||||||
|
// 5. execution
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL;
|
||||||
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
|
@ -61,7 +61,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
||||||
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
len = pMsg->contLen - sizeof(SMsgHead);
|
len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
// todo: change the interface here
|
|
||||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||||
vError("vgId: %d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId: %d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -81,7 +81,7 @@
|
||||||
./test.sh -f tsim/insert/backquote.sim -m
|
./test.sh -f tsim/insert/backquote.sim -m
|
||||||
./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
|
./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
|
||||||
./test.sh -f tsim/query/interval-offset.sim -m
|
./test.sh -f tsim/query/interval-offset.sim -m
|
||||||
#./test.sh -f tsim/tmq/basic1.sim -m
|
./test.sh -f tsim/tmq/basic3.sim -m
|
||||||
./test.sh -f tsim/stable/vnode3.sim -m
|
./test.sh -f tsim/stable/vnode3.sim -m
|
||||||
./test.sh -f tsim/qnode/basic1.sim -m
|
./test.sh -f tsim/qnode/basic1.sim -m
|
||||||
./test.sh -f tsim/mnode/basic1.sim -m
|
./test.sh -f tsim/mnode/basic1.sim -m
|
||||||
|
|
Loading…
Reference in New Issue