integrating wal with tq

This commit is contained in:
Liu Jicong 2022-01-20 15:39:28 +08:00
parent 6bf4614bfa
commit 5b51229577
4 changed files with 14 additions and 12 deletions

View File

@ -177,7 +177,7 @@ typedef struct STqBuffer {
typedef struct STqTopicHandle { typedef struct STqTopicHandle {
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_TOPIC_FNAME_LEN];
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
@ -297,6 +297,7 @@ typedef struct STQ {
STqCfg* tqConfig; STqCfg* tqConfig;
STqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
SWal * pWal;
} STQ; } STQ;
typedef struct STqMgmt { typedef struct STqMgmt {
@ -311,7 +312,7 @@ int tqInit();
void tqCleanUp(); void tqCleanUp();
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
void tqClose(STQ*); void tqClose(STQ*);
// void* will be replace by a msg type // void* will be replace by a msg type

View File

@ -50,7 +50,7 @@ void tqCleanUp() {
taosTmrCleanUp(tqMgmt.timer); taosTmrCleanUp(tqMgmt.timer);
} }
STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
STQ* pTq = malloc(sizeof(STQ)); STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;

View File

@ -117,14 +117,6 @@ static int vnodeOpenImpl(SVnode *pVnode) {
return -1; return -1;
} }
// Open TQ
sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) {
// TODO: handle error
return -1;
}
// Open WAL // Open WAL
sprintf(dir, "%s/wal", pVnode->path); sprintf(dir, "%s/wal", pVnode->path);
pVnode->pWal = walOpen(dir, &(pVnode->config.walCfg)); pVnode->pWal = walOpen(dir, &(pVnode->config.walCfg));
@ -133,6 +125,14 @@ static int vnodeOpenImpl(SVnode *pVnode) {
return -1; return -1;
} }
// Open TQ
sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, pVnode->pWal, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) {
// TODO: handle error
return -1;
}
// Open Query // Open Query
if (vnodeQueryOpen(pVnode)) { if (vnodeQueryOpen(pVnode)) {
return -1; return -1;

View File

@ -121,7 +121,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: handle error // TODO: handle error
} }
strcpy(pTopic->topicName, req.topicName); strcpy(pTopic->topicName, req.topicName);
strcpy(pTopic->cGroup, req.cGroup); strcpy(pTopic->cgroup, req.cGroup);
strcpy(pTopic->sql, req.sql); strcpy(pTopic->sql, req.sql);
strcpy(pTopic->logicalPlan, req.logicalPlan); strcpy(pTopic->logicalPlan, req.logicalPlan);
strcpy(pTopic->physicalPlan, req.physicalPlan); strcpy(pTopic->physicalPlan, req.physicalPlan);
@ -143,6 +143,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pTopic->buffer.output[i].pMsg = pMsg; pTopic->buffer.output[i].pMsg = pMsg;
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
} }
pTopic->pReadhandle = walOpenReadHandle(pVnode->pTq->pWal);
// write mq meta // write mq meta
} }
break; break;