Merge pull request #9925 from taosdata/feature/tq

integrating wal with tq
This commit is contained in:
Liu Jicong 2022-01-20 15:52:43 +08:00 committed by GitHub
commit 825f69d2ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 14 additions and 12 deletions

View File

@ -177,7 +177,7 @@ typedef struct STqBuffer {
typedef struct STqTopicHandle {
char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_TOPIC_FNAME_LEN];
char* sql;
char* logicalPlan;
char* physicalPlan;
@ -297,6 +297,7 @@ typedef struct STQ {
STqCfg* tqConfig;
STqMemRef tqMemRef;
STqMetaStore* tqMeta;
SWal * pWal;
} STQ;
typedef struct STqMgmt {
@ -311,7 +312,7 @@ int tqInit();
void tqCleanUp();
// open in each vnode
STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
void tqClose(STQ*);
// void* will be replace by a msg type

View File

@ -50,7 +50,7 @@ void tqCleanUp() {
taosTmrCleanUp(tqMgmt.timer);
}
STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;

View File

@ -117,14 +117,6 @@ static int vnodeOpenImpl(SVnode *pVnode) {
return -1;
}
// Open TQ
sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) {
// TODO: handle error
return -1;
}
// Open WAL
sprintf(dir, "%s/wal", pVnode->path);
pVnode->pWal = walOpen(dir, &(pVnode->config.walCfg));
@ -133,6 +125,14 @@ static int vnodeOpenImpl(SVnode *pVnode) {
return -1;
}
// Open TQ
sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, pVnode->pWal, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) {
// TODO: handle error
return -1;
}
// Open Query
if (vnodeQueryOpen(pVnode)) {
return -1;

View File

@ -121,7 +121,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: handle error
}
strcpy(pTopic->topicName, req.topicName);
strcpy(pTopic->cGroup, req.cGroup);
strcpy(pTopic->cgroup, req.cGroup);
strcpy(pTopic->sql, req.sql);
strcpy(pTopic->logicalPlan, req.logicalPlan);
strcpy(pTopic->physicalPlan, req.physicalPlan);
@ -143,6 +143,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pTopic->buffer.output[i].pMsg = pMsg;
pTopic->buffer.output[i].status = 0;
}
pTopic->pReadhandle = walOpenReadHandle(pVnode->pTq->pWal);
// write mq meta
}
break;