refactor: do some internal refactor.
This commit is contained in:
parent
6015729560
commit
f00b7b3af0
|
@ -256,8 +256,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
||||||
int32_t status = atomic_load_32(&pConsumer->status);
|
int32_t status = atomic_load_32(&pConsumer->status);
|
||||||
|
|
||||||
mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64,
|
mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64", hbstatus:%d",
|
||||||
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime);
|
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime,
|
||||||
|
hbStatus);
|
||||||
|
|
||||||
if (status == MQ_CONSUMER_STATUS__READY) {
|
if (status == MQ_CONSUMER_STATUS__READY) {
|
||||||
if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
|
if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
|
||||||
|
@ -269,6 +270,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
.pCont = pLostMsg,
|
.pCont = pLostMsg,
|
||||||
.contLen = sizeof(SMqConsumerLostMsg),
|
.contLen = sizeof(SMqConsumerLostMsg),
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||||
|
@ -282,6 +284,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
.pCont = pClearMsg,
|
.pCont = pClearMsg,
|
||||||
.contLen = sizeof(SMqConsumerClearMsg),
|
.contLen = sizeof(SMqConsumerClearMsg),
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
} else if (status == MQ_CONSUMER_STATUS__LOST) {
|
} else if (status == MQ_CONSUMER_STATUS__LOST) {
|
||||||
|
|
|
@ -551,8 +551,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
|
pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pSub->unassignedVgs);
|
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||||
|
|
|
@ -122,6 +122,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMsgHead *pMsgHead = (SMsgHead *)buf;
|
SMsgHead *pMsgHead = (SMsgHead *)buf;
|
||||||
|
|
||||||
pMsgHead->contLen = htonl(tlen);
|
pMsgHead->contLen = htonl(tlen);
|
||||||
|
|
|
@ -364,7 +364,8 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
|
||||||
|
|
||||||
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
|
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
|
||||||
const char *userName) {
|
const char *userName) {
|
||||||
mInfo("topic:%s to create", pCreate->name);
|
mInfo("topic:%s created", pCreate->name);
|
||||||
|
|
||||||
SMqTopicObj topicObj = {0};
|
SMqTopicObj topicObj = {0};
|
||||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
||||||
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
@ -383,19 +384,18 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
topicObj.sqlLen = strlen(pCreate->sql) + 1;
|
topicObj.sqlLen = strlen(pCreate->sql) + 1;
|
||||||
topicObj.subType = pCreate->subType;
|
topicObj.subType = pCreate->subType;
|
||||||
topicObj.withMeta = pCreate->withMeta;
|
topicObj.withMeta = pCreate->withMeta;
|
||||||
if (topicObj.withMeta) {
|
|
||||||
if (topicObj.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (topicObj.withMeta && topicObj.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
topicObj.ast = strdup(pCreate->ast);
|
topicObj.ast = strdup(pCreate->ast);
|
||||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||||
|
|
||||||
qDebugL("ast %s", topicObj.ast);
|
qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
|
||||||
|
|
||||||
SNode *pAst = NULL;
|
SNode *pAst = NULL;
|
||||||
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
|
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
|
||||||
|
@ -409,18 +409,20 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
|
|
||||||
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
||||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
|
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("failed to create topic:%s since %s", pCreate->name, terrstr());
|
||||||
taosMemoryFree(topicObj.ast);
|
taosMemoryFree(topicObj.ast);
|
||||||
taosMemoryFree(topicObj.sql);
|
taosMemoryFree(topicObj.sql);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t ntbUid;
|
|
||||||
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
|
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
|
||||||
if (topicObj.ntbColIds == NULL) {
|
if (topicObj.ntbColIds == NULL) {
|
||||||
|
taosMemoryFree(topicObj.ast);
|
||||||
|
taosMemoryFree(topicObj.sql);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
extractTopicTbInfo(pAst, &topicObj);
|
extractTopicTbInfo(pAst, &topicObj);
|
||||||
|
|
||||||
if (topicObj.ntbUid == 0) {
|
if (topicObj.ntbUid == 0) {
|
||||||
|
@ -467,7 +469,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
taosMemoryFreeClear(topicObj.physicalPlan);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
mInfo("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);
|
|
||||||
|
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
|
||||||
|
|
||||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
|
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
|
||||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||||
|
@ -476,6 +479,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
if (topicObj.ntbUid != 0) {
|
if (topicObj.ntbUid != 0) {
|
||||||
|
@ -544,7 +548,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
taosMemoryFreeClear(topicObj.sql);
|
taosMemoryFreeClear(topicObj.sql);
|
||||||
taosMemoryFreeClear(topicObj.ast);
|
taosMemoryFreeClear(topicObj.ast);
|
||||||
taosArrayDestroy(topicObj.ntbColIds);
|
taosArrayDestroy(topicObj.ntbColIds);
|
||||||
if (topicObj.schema.nCols) taosMemoryFreeClear(topicObj.schema.pSchema);
|
|
||||||
|
if (topicObj.schema.nCols) {
|
||||||
|
taosMemoryFreeClear(topicObj.schema.pSchema);
|
||||||
|
}
|
||||||
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
@ -589,11 +597,13 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb, pReq->info.conn.user);
|
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb, pReq->info.conn.user);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) {
|
||||||
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
}
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
mError("failed to create topic:%s since %s", createTopicReq.name, terrstr());
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
@ -605,13 +615,18 @@ _OVER:
|
||||||
|
|
||||||
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
|
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) goto _OVER;
|
if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
|
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
|
||||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER;
|
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER;
|
||||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -650,7 +665,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
SMqConsumerObj *pConsumer;
|
SMqConsumerObj *pConsumer;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
|
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
|
||||||
|
|
||||||
|
@ -661,7 +678,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s", dropReq.name,
|
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", dropReq.name,
|
||||||
pConsumer->consumerId, pConsumer->cgroup);
|
pConsumer->consumerId, pConsumer->cgroup);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -773,6 +790,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
||||||
|
*pNumOfTopics = 0;
|
||||||
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
|
@ -785,7 +804,9 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
|
||||||
while (1) {
|
while (1) {
|
||||||
SMqTopicObj *pTopic = NULL;
|
SMqTopicObj *pTopic = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (pTopic->dbUid == pDb->uid) {
|
if (pTopic->dbUid == pDb->uid) {
|
||||||
numOfTopics++;
|
numOfTopics++;
|
||||||
|
@ -814,10 +835,9 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0};
|
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0};
|
||||||
tstrncpy(varDataVal(topicName), mndGetDbStr(pTopic->name), sizeof(topicName) - 2);
|
const char* pName = mndGetDbStr(pTopic->name);
|
||||||
/*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/
|
STR_TO_VARSTR(topicName, pName);
|
||||||
/*tNameGetDbName(&n, varDataVal(topicName));*/
|
|
||||||
varDataSetLen(topicName, strlen(varDataVal(topicName)));
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
|
||||||
|
|
||||||
|
@ -825,6 +845,7 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
||||||
tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
|
tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
|
||||||
tNameGetDbName(&n, varDataVal(dbName));
|
tNameGetDbName(&n, varDataVal(dbName));
|
||||||
varDataSetLen(dbName, strlen(varDataVal(dbName)));
|
varDataSetLen(dbName, strlen(varDataVal(dbName)));
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
|
||||||
|
|
||||||
|
@ -832,8 +853,8 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
|
||||||
|
|
||||||
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
|
STR_TO_VARSTR(sql, pTopic->sql);
|
||||||
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
|
||||||
|
|
||||||
|
|
|
@ -1062,10 +1062,14 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser);
|
pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
code = -1;
|
code = -1;
|
||||||
if (mndUserDupObj(pUser, &newUser) != 0) break;
|
if (mndUserDupObj(pUser, &newUser) != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
bool inTopic = (taosHashGet(newUser.topics, topic, len) != NULL);
|
bool inTopic = (taosHashGet(newUser.topics, topic, len) != NULL);
|
||||||
if (inTopic) {
|
if (inTopic) {
|
||||||
|
|
Loading…
Reference in New Issue