commit
dfb7ed0658
|
@ -16,6 +16,9 @@
|
|||
#ifndef _TD_SNODE_H_
|
||||
#define _TD_SNODE_H_
|
||||
|
||||
#include "tmsg.h"
|
||||
#include "trpc.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
|
|
@ -661,6 +661,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
|
|||
return buf;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
} SStreamScheduler;
|
||||
|
||||
typedef struct SMnodeMsg {
|
||||
char user[TSDB_USER_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_MND_SCHEDULER_H_
|
||||
#define _TD_MND_SCHEDULER_H_
|
||||
|
||||
#include "mndInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t mndInitScheduler(SMnode* pMnode);
|
||||
void mndCleanupScheduler(SMnode* pMnode);
|
||||
|
||||
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_MND_SCHEDULER_H_ */
|
|
@ -340,9 +340,16 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
|
|||
pVgroup->compStorage = pVload->compStorage;
|
||||
pVgroup->pointsWritten = pVload->pointsWritten;
|
||||
}
|
||||
bool roleChanged = false;
|
||||
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
|
||||
if (pVgroup->vnodeGid[vg].role != pVload->role) {
|
||||
roleChanged = true;
|
||||
}
|
||||
pVgroup->vnodeGid[vg].role = pVload->role;
|
||||
}
|
||||
if (roleChanged) {
|
||||
// notify scheduler role has changed
|
||||
}
|
||||
}
|
||||
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "mndScheduler.h"
|
||||
#include "mndConsumer.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndOffset.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndTopic.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "tcompare.h"
|
||||
#include "tname.h"
|
||||
|
||||
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
|
||||
SSdb* pSdb = pMnode->pSdb;
|
||||
SVgObj* pVgroup = NULL;
|
||||
SQueryDag* pDag = qStringToDag(pTopic->physicalPlan);
|
||||
SArray* pAray = NULL;
|
||||
SArray* unassignedVg = pSub->unassignedVg;
|
||||
|
||||
ASSERT(pSub->vgNum == 0);
|
||||
|
||||
int32_t levelNum = taosArrayGetSize(pDag->pSubplans);
|
||||
if (levelNum != 1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SArray* inner = taosArrayGet(pDag->pSubplans, 0);
|
||||
SSubplan* plan = taosArrayGetP(inner, 0);
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pTopic->dbUid) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
pSub->vgNum++;
|
||||
plan->execNode.nodeId = pVgroup->vgId;
|
||||
plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
|
||||
SMqConsumerEp consumerEp = {0};
|
||||
consumerEp.status = 0;
|
||||
consumerEp.consumerId = -1;
|
||||
consumerEp.epSet = plan->execNode.epset;
|
||||
consumerEp.vgId = plan->execNode.nodeId;
|
||||
int32_t msgLen;
|
||||
int32_t code = qSubPlanToString(plan, &consumerEp.qmsg, &msgLen);
|
||||
taosArrayPush(unassignedVg, &consumerEp);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndOffset.h"
|
||||
#include "mndScheduler.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndTopic.h"
|
||||
|
@ -97,12 +98,21 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
|
|||
strcpy(pSub->key, key);
|
||||
free(key);
|
||||
|
||||
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
|
||||
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
|
||||
tDeleteSMqSubscribeObj(pSub);
|
||||
free(pSub);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (mndInitUnassignedVg(pMnode, pTopic, pSub) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tDeleteSMqSubscribeObj(pSub);
|
||||
free(pSub);
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
// TODO: disable alter subscribed table
|
||||
return pSub;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
||||
SSnode *pSnode = calloc(1, sizeof(SSnode));
|
||||
memcpy(&pSnode->cfg, pOption, sizeof(SSnodeOpt));
|
||||
return pSnode;
|
||||
}
|
||||
|
||||
|
|
|
@ -376,7 +376,10 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
|||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||
pTopic->buffer.output[i].status = 0;
|
||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
|
||||
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta};
|
||||
SReadHandle handle = {
|
||||
.reader = pReadHandle,
|
||||
.meta = pTq->pVnodeMeta,
|
||||
};
|
||||
pTopic->buffer.output[i].pReadHandle = pReadHandle;
|
||||
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
|||
pReadHandle->sver = -1;
|
||||
pReadHandle->pSchema = NULL;
|
||||
pReadHandle->pSchemaWrapper = NULL;
|
||||
pReadHandle->tbIdHash = NULL;
|
||||
return pReadHandle;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue