diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index ec886286f3..e394297ab8 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -177,7 +177,7 @@ typedef struct STqBuffer { typedef struct STqTopicHandle { char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_TOPIC_FNAME_LEN]; char* sql; char* logicalPlan; char* physicalPlan; @@ -297,6 +297,7 @@ typedef struct STQ { STqCfg* tqConfig; STqMemRef tqMemRef; STqMetaStore* tqMeta; + SWal * pWal; } STQ; typedef struct STqMgmt { @@ -311,7 +312,7 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); void tqClose(STQ*); // void* will be replace by a msg type diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ad71005153..15c7971252 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -50,7 +50,7 @@ void tqCleanUp() { taosTmrCleanUp(tqMgmt.timer); } -STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { STQ* pTq = malloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index fd79341adf..c4bbd93eda 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -117,14 +117,6 @@ static int vnodeOpenImpl(SVnode *pVnode) { return -1; } - // Open TQ - sprintf(dir, "%s/tq", pVnode->path); - pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); - if (pVnode->pTq == NULL) { - // TODO: handle error - return -1; - } - // Open WAL sprintf(dir, "%s/wal", pVnode->path); pVnode->pWal = walOpen(dir, &(pVnode->config.walCfg)); @@ -133,6 +125,14 @@ static int vnodeOpenImpl(SVnode *pVnode) { return -1; } + // Open TQ + sprintf(dir, "%s/tq", pVnode->path); + pVnode->pTq = tqOpen(dir, pVnode->pWal, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); + if (pVnode->pTq == NULL) { + // TODO: handle error + return -1; + } + // Open Query if (vnodeQueryOpen(pVnode)) { return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index f05520a960..6e2d438970 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -121,7 +121,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: handle error } strcpy(pTopic->topicName, req.topicName); - strcpy(pTopic->cGroup, req.cGroup); + strcpy(pTopic->cgroup, req.cGroup); strcpy(pTopic->sql, req.sql); strcpy(pTopic->logicalPlan, req.logicalPlan); strcpy(pTopic->physicalPlan, req.physicalPlan); @@ -143,6 +143,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pTopic->buffer.output[i].pMsg = pMsg; pTopic->buffer.output[i].status = 0; } + pTopic->pReadhandle = walOpenReadHandle(pVnode->pTq->pWal); // write mq meta } break;