opti:tmq
This commit is contained in:
parent
c866c9ad02
commit
5624efff4e
|
@ -353,6 +353,66 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t sendCheckInfoToVnode(STrans *pTrans, SMnode *pMnode, SMqTopicObj *topicObj){
|
||||||
|
STqCheckInfo info;
|
||||||
|
memcpy(info.topic, topicObj->name, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
info.ntbUid = topicObj->ntbUid;
|
||||||
|
info.colIdList = topicObj->ntbColIds;
|
||||||
|
// broadcast forbid alter info
|
||||||
|
void *pIter = NULL;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
void *buf = NULL;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
// iterate vg
|
||||||
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
if (!mndVgroupInDb(pVgroup, topicObj->dbUid)) {
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// encoder check alter info
|
||||||
|
int32_t len;
|
||||||
|
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||||
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, abuf, len);
|
||||||
|
code = tEncodeSTqCheckInfo(&encoder, &info);
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
|
||||||
|
// add redo action
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
action.pCont = buf;
|
||||||
|
action.contLen = sizeof(SMsgHead) + len;
|
||||||
|
action.msgType = TDMT_VND_TMQ_ADD_CHECKINFO;
|
||||||
|
code = mndTransAppendRedoAction(pTrans, &action);
|
||||||
|
if (code != 0) {
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
buf = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
END:
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
|
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
|
||||||
const char *userName) {
|
const char *userName) {
|
||||||
mInfo("start to create topic:%s", pCreate->name);
|
mInfo("start to create topic:%s", pCreate->name);
|
||||||
|
@ -396,13 +456,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
topicObj.withMeta = pCreate->withMeta;
|
topicObj.withMeta = pCreate->withMeta;
|
||||||
|
|
||||||
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
if (pCreate->withMeta) {
|
|
||||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
|
||||||
code = terrno;
|
|
||||||
goto _OUT;
|
|
||||||
}
|
|
||||||
|
|
||||||
topicObj.ast = taosStrdup(pCreate->ast);
|
topicObj.ast = taosStrdup(pCreate->ast);
|
||||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||||
|
|
||||||
|
@ -474,59 +527,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
if (topicObj.ntbUid != 0) {
|
if (topicObj.ntbUid != 0) {
|
||||||
STqCheckInfo info;
|
code = sendCheckInfoToVnode(pTrans, pMnode, &topicObj);
|
||||||
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
|
if (code != 0){
|
||||||
info.ntbUid = topicObj.ntbUid;
|
goto _OUT;
|
||||||
info.colIdList = topicObj.ntbColIds;
|
|
||||||
// broadcast forbid alter info
|
|
||||||
void *pIter = NULL;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
SVgObj *pVgroup = NULL;
|
|
||||||
while (1) {
|
|
||||||
// iterate vg
|
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
if (!mndVgroupInDb(pVgroup, topicObj.dbUid)) {
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// encoder check alter info
|
|
||||||
int32_t len;
|
|
||||||
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
|
|
||||||
if (code < 0) {
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
goto _OUT;
|
|
||||||
}
|
|
||||||
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
|
||||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
|
||||||
SEncoder encoder;
|
|
||||||
tEncoderInit(&encoder, abuf, len);
|
|
||||||
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
|
|
||||||
taosMemoryFree(buf);
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
code = -1;
|
|
||||||
goto _OUT;
|
|
||||||
}
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
|
|
||||||
// add redo action
|
|
||||||
STransAction action = {0};
|
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
|
||||||
action.pCont = buf;
|
|
||||||
action.contLen = sizeof(SMsgHead) + len;
|
|
||||||
action.msgType = TDMT_VND_TMQ_ADD_CHECKINFO;
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(buf);
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
code = -1;
|
|
||||||
goto _OUT;
|
|
||||||
}
|
|
||||||
buf = NULL;
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -88,10 +88,6 @@ int32_t tqMetaOpen(STQ* pTq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (tqMetaRestoreHandle(pTq) < 0) {
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (tqMetaRestoreCheckInfo(pTq) < 0) {
|
if (tqMetaRestoreCheckInfo(pTq) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -167,32 +163,30 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int vLen = 0;
|
int vLen = 0;
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
tdbTbcMoveToFirst(pCur);
|
tdbTbcMoveToFirst(pCur);
|
||||||
|
|
||||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||||
STqCheckInfo info;
|
STqCheckInfo info;
|
||||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
|
code = tDecodeSTqCheckInfo(&decoder, &info);
|
||||||
|
if (code != 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tdbFree(pKey);
|
goto END;
|
||||||
tdbFree(pVal);
|
|
||||||
tdbTbcClose(pCur);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
|
code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo));
|
||||||
|
if (code != 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tdbFree(pKey);
|
goto END;
|
||||||
tdbFree(pVal);
|
|
||||||
tdbTbcClose(pCur);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
END:
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
||||||
|
|
Loading…
Reference in New Issue