commit
294f8d0a6c
|
@ -14,5 +14,6 @@ ADD_SUBDIRECTORY(mnode)
|
|||
ADD_SUBDIRECTORY(vnode)
|
||||
ADD_SUBDIRECTORY(tsdb)
|
||||
ADD_SUBDIRECTORY(wal)
|
||||
ADD_SUBDIRECTORY(cq)
|
||||
ADD_SUBDIRECTORY(dnode)
|
||||
ADD_SUBDIRECTORY(connector/jdbc)
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||
PROJECT(TDengine)
|
||||
|
||||
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
|
||||
INCLUDE_DIRECTORIES(inc)
|
||||
|
||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
|
||||
|
||||
ADD_LIBRARY(tcq ${SRC})
|
||||
TARGET_LINK_LIBRARIES(tcq tutil common taos)
|
||||
|
||||
ADD_SUBDIRECTORY(test)
|
||||
|
|
@ -0,0 +1,249 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
#include "taosdef.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tglobal.h"
|
||||
#include "tlog.h"
|
||||
#include "twal.h"
|
||||
#include "tcq.h"
|
||||
#include "taos.h"
|
||||
|
||||
#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);}
|
||||
#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);}
|
||||
#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);}
|
||||
#define cPrint(...) {taosPrintLog("WAL ", 255, __VA_ARGS__);}
|
||||
|
||||
typedef struct {
|
||||
int vgId;
|
||||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_PASSWORD_LEN];
|
||||
FCqWrite cqWrite;
|
||||
void *ahandle;
|
||||
int num; // number of continuous streams
|
||||
struct SCqObj *pHead;
|
||||
void *dbConn;
|
||||
pthread_mutex_t mutex;
|
||||
} SCqContext;
|
||||
|
||||
typedef struct SCqObj {
|
||||
int tid; // table ID
|
||||
int rowSize; // bytes of a row
|
||||
char *sqlStr; // SQL string
|
||||
int columns; // number of columns
|
||||
SSchema *pSchema; // pointer to schema array
|
||||
void *pStream;
|
||||
struct SCqObj *prev;
|
||||
struct SCqObj *next;
|
||||
SCqContext *pContext;
|
||||
} SCqObj;
|
||||
|
||||
int cqDebugFlag = 135;
|
||||
|
||||
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
|
||||
|
||||
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
||||
|
||||
SCqContext *pContext = calloc(sizeof(SCqContext), 1);
|
||||
if (pContext == NULL) return NULL;
|
||||
|
||||
strcpy(pContext->user, pCfg->user);
|
||||
strcpy(pContext->pass, pCfg->pass);
|
||||
pContext->vgId = pCfg->vgId;
|
||||
pContext->cqWrite = pCfg->cqWrite;
|
||||
pContext->ahandle = ahandle;
|
||||
|
||||
pthread_mutex_init(&pContext->mutex, NULL);
|
||||
|
||||
cTrace("vgId:%d, CQ is opened", pContext->vgId);
|
||||
|
||||
return pContext;
|
||||
}
|
||||
|
||||
void cqClose(void *handle) {
|
||||
SCqContext *pContext = handle;
|
||||
|
||||
// stop all CQs
|
||||
cqStop(pContext);
|
||||
|
||||
// free all resources
|
||||
SCqObj *pObj = pContext->pHead;
|
||||
while (pObj) {
|
||||
SCqObj *pTemp = pObj;
|
||||
pObj = pObj->next;
|
||||
free(pTemp);
|
||||
}
|
||||
|
||||
pthread_mutex_destroy(&pContext->mutex);
|
||||
|
||||
cTrace("vgId:%d, CQ is closed", pContext->vgId);
|
||||
free(pContext);
|
||||
}
|
||||
|
||||
void cqStart(void *handle) {
|
||||
SCqContext *pContext = handle;
|
||||
cTrace("vgId:%d, start all CQs", pContext->vgId);
|
||||
if (pContext->dbConn) return;
|
||||
|
||||
pthread_mutex_lock(&pContext->mutex);
|
||||
|
||||
tscEmbedded = 1;
|
||||
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
|
||||
if (pContext->dbConn == NULL) {
|
||||
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
|
||||
pthread_mutex_unlock(&pContext->mutex);
|
||||
return;
|
||||
}
|
||||
|
||||
SCqObj *pObj = pContext->pHead;
|
||||
while (pObj) {
|
||||
int64_t lastKey = 0;
|
||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||
if (pObj->pStream) {
|
||||
pContext->num++;
|
||||
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
} else {
|
||||
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
}
|
||||
pObj = pObj->next;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pContext->mutex);
|
||||
}
|
||||
|
||||
void cqStop(void *handle) {
|
||||
SCqContext *pContext = handle;
|
||||
cTrace("vgId:%d, stop all CQs", pContext->vgId);
|
||||
if (pContext->dbConn == NULL) return;
|
||||
|
||||
pthread_mutex_lock(&pContext->mutex);
|
||||
|
||||
SCqObj *pObj = pContext->pHead;
|
||||
while (pObj) {
|
||||
if (pObj->pStream) {
|
||||
taos_close_stream(pObj->pStream);
|
||||
pObj->pStream = NULL;
|
||||
cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
}
|
||||
|
||||
pObj = pObj->next;
|
||||
}
|
||||
|
||||
if (pContext->dbConn) taos_close(pContext->dbConn);
|
||||
pContext->dbConn = NULL;
|
||||
|
||||
pthread_mutex_unlock(&pContext->mutex);
|
||||
}
|
||||
|
||||
void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int columns) {
|
||||
SCqContext *pContext = handle;
|
||||
|
||||
SCqObj *pObj = calloc(sizeof(SCqObj), 1);
|
||||
if (pObj == NULL) return NULL;
|
||||
|
||||
pObj->tid = tid;
|
||||
pObj->sqlStr = malloc(strlen(sqlStr)+1);
|
||||
strcpy(pObj->sqlStr, sqlStr);
|
||||
|
||||
pObj->columns = columns;
|
||||
|
||||
int size = sizeof(SSchema) * columns;
|
||||
pObj->pSchema = malloc(size);
|
||||
memcpy(pObj->pSchema, pSchema, size);
|
||||
|
||||
cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
|
||||
pthread_mutex_lock(&pContext->mutex);
|
||||
|
||||
pObj->next = pContext->pHead;
|
||||
if (pContext->pHead) pContext->pHead->prev = pObj;
|
||||
pContext->pHead = pObj;
|
||||
|
||||
if (pContext->dbConn) {
|
||||
int64_t lastKey = 0;
|
||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||
if (pObj->pStream) {
|
||||
pContext->num++;
|
||||
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
} else {
|
||||
cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pContext->mutex);
|
||||
|
||||
return pObj;
|
||||
}
|
||||
|
||||
void cqDrop(void *handle) {
|
||||
SCqObj *pObj = handle;
|
||||
SCqContext *pContext = pObj->pContext;
|
||||
|
||||
pthread_mutex_lock(&pContext->mutex);
|
||||
|
||||
if (pObj->prev) {
|
||||
pObj->prev->next = pObj->next;
|
||||
} else {
|
||||
pContext->pHead = pObj->next;
|
||||
}
|
||||
|
||||
if (pObj->next) {
|
||||
pObj->next->prev = pObj->prev;
|
||||
}
|
||||
|
||||
// free the resources associated
|
||||
if (pObj->pStream) taos_close_stream(pObj->pStream);
|
||||
pObj->pStream = NULL;
|
||||
|
||||
cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
free(pObj);
|
||||
|
||||
pthread_mutex_lock(&pContext->mutex);
|
||||
}
|
||||
|
||||
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
||||
SCqObj *pObj = (SCqObj *)param;
|
||||
SCqContext *pContext = pObj->pContext;
|
||||
if (pObj->pStream == NULL) return;
|
||||
|
||||
cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
|
||||
// construct data
|
||||
int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize;
|
||||
char *buffer = calloc(size, 1);
|
||||
|
||||
SWalHead *pHead = (SWalHead *)buffer;
|
||||
pHead->msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||
pHead->len = size - sizeof(SWalHead);
|
||||
|
||||
SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead));
|
||||
// to do: fill in the SSubmitMsg structure
|
||||
pSubmit->numOfBlocks = 1;
|
||||
|
||||
|
||||
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
|
||||
// to do: fill in the SSubmitBlk strucuture
|
||||
pBlk->tid = pObj->tid;
|
||||
|
||||
|
||||
// write into vnode write queue
|
||||
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ);
|
||||
}
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||
PROJECT(TDengine)
|
||||
|
||||
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
|
||||
INCLUDE_DIRECTORIES(../inc)
|
||||
|
||||
LIST(APPEND CQTEST_SRC ./cqtest.c)
|
||||
ADD_EXECUTABLE(cqtest ${CQTEST_SRC})
|
||||
TARGET_LINK_LIBRARIES(cqtest tcq)
|
||||
|
||||
ENDIF ()
|
||||
|
||||
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
//#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taosdef.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tglobal.h"
|
||||
#include "tlog.h"
|
||||
#include "tcq.h"
|
||||
|
||||
int64_t ver = 0;
|
||||
void *pCq = NULL;
|
||||
|
||||
int writeToQueue(void *pVnode, void *data, int type) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
int num = 3;
|
||||
|
||||
for (int i=1; i<argc; ++i) {
|
||||
if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||
ddebugFlag = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-n") == 0 && i <argc-1) {
|
||||
num = atoi(argv[++i]);
|
||||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
printf(" [-n num]: number of streams, default:%d\n", num);
|
||||
printf(" [-d debugFlag]: debug flag, default:%d\n", ddebugFlag);
|
||||
printf(" [-h help]: print out this help\n\n");
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
taosInitLog("cq.log", 100000, 10);
|
||||
|
||||
SCqCfg cqCfg;
|
||||
strcpy(cqCfg.user, "root");
|
||||
strcpy(cqCfg.pass, "taosdata");
|
||||
cqCfg.vgId = 2;
|
||||
cqCfg.cqWrite = writeToQueue;
|
||||
|
||||
pCq = cqOpen(NULL, &cqCfg);
|
||||
if (pCq == NULL) {
|
||||
printf("failed to open CQ\n");
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
SSchema schema[2];
|
||||
schema[0].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(schema[0].name, "ts");
|
||||
schema[0].colId = 0;
|
||||
schema[0].bytes = 8;
|
||||
|
||||
schema[1].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(schema[1].name, "avgspeed");
|
||||
schema[1].colId = 1;
|
||||
schema[1].bytes = 4;
|
||||
|
||||
for (int sid =1; sid<10; ++sid) {
|
||||
cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", schema, 2);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
char c = getchar();
|
||||
|
||||
switch(c) {
|
||||
case 's':
|
||||
cqStart(pCq);
|
||||
break;
|
||||
case 't':
|
||||
cqStop(pCq);
|
||||
break;
|
||||
case 'c':
|
||||
// create a CQ
|
||||
break;
|
||||
case 'd':
|
||||
// drop a CQ
|
||||
break;
|
||||
case 'q':
|
||||
break;
|
||||
default:
|
||||
printf("invalid command:%c", c);
|
||||
}
|
||||
|
||||
if (c=='q') break;
|
||||
}
|
||||
|
||||
cqClose(pCq);
|
||||
|
||||
taosCloseLog();
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -335,6 +335,11 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
|||
#define TSDB_PORT_MNODEDNODE 15
|
||||
#define TSDB_PORT_SYNC 20
|
||||
|
||||
#define TAOS_QTYPE_RPC 0
|
||||
#define TAOS_QTYPE_FWD 1
|
||||
#define TAOS_QTYPE_WAL 2
|
||||
#define TAOS_QTYPE_CQ 3
|
||||
|
||||
typedef enum {
|
||||
TSDB_PRECISION_MILLI,
|
||||
TSDB_PRECISION_MICRO,
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#ifndef _TD_CQ_H_
|
||||
#define _TD_CQ_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
|
||||
typedef int (*FCqWrite)(void *ahandle, void *pHead, int type);
|
||||
|
||||
typedef struct {
|
||||
int vgId;
|
||||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_PASSWORD_LEN];
|
||||
FCqWrite cqWrite;
|
||||
} SCqCfg;
|
||||
|
||||
// the following API shall be called by vnode
|
||||
void *cqOpen(void *ahandle, const SCqCfg *pCfg);
|
||||
void cqClose(void *handle);
|
||||
|
||||
// if vnode is master, vnode call this API to start CQ
|
||||
void cqStart(void *handle);
|
||||
|
||||
// if vnode is slave/unsynced, vnode shall call this API to stop CQ
|
||||
void cqStop(void *handle);
|
||||
|
||||
// cqCreate is called by TSDB to start an instance of CQ
|
||||
void *cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns);
|
||||
|
||||
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
|
||||
void cqDrop(void *handle);
|
||||
|
||||
extern int cqDebugFlag;
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // _TD_CQ_H_
|
|
@ -38,9 +38,9 @@ extern "C" {
|
|||
typedef struct {
|
||||
// WAL handle
|
||||
void *appH;
|
||||
void *cqH;
|
||||
int (*walCallBack)(void *);
|
||||
int (*eventCallBack)(void *);
|
||||
int (*cqueryCallBack)(void *);
|
||||
} STsdbAppH;
|
||||
|
||||
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
|
||||
|
|
|
@ -20,10 +20,6 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define TAOS_QTYPE_RPC 0
|
||||
#define TAOS_QTYPE_FWD 1
|
||||
#define TAOS_QTYPE_WAL 2
|
||||
|
||||
typedef void* taos_queue;
|
||||
typedef void* taos_qset;
|
||||
typedef void* taos_qall;
|
||||
|
|
|
@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
|||
AUX_SOURCE_DIRECTORY(src SRC)
|
||||
|
||||
ADD_LIBRARY(vnode ${SRC})
|
||||
TARGET_LINK_LIBRARIES(vnode tsdb)
|
||||
ENDIF ()
|
||||
TARGET_LINK_LIBRARIES(vnode tsdb tcq)
|
||||
ENDIF ()
|
||||
|
|
|
@ -30,6 +30,8 @@
|
|||
#include "vnode.h"
|
||||
#include "vnodeInt.h"
|
||||
#include "vnodeLog.h"
|
||||
#include "tcq.h"
|
||||
//#include "tsync.h"
|
||||
|
||||
static int32_t tsOpennedVnodes;
|
||||
static void *tsDnodeVnodesHash;
|
||||
|
@ -192,8 +194,28 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
||||
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
||||
|
||||
SCqCfg cqCfg;
|
||||
sprintf(cqCfg.user, "root");
|
||||
strcpy(cqCfg.pass, tsInternalPass);
|
||||
cqCfg.cqWrite = vnodeWriteToQueue;
|
||||
pVnode->cq = cqOpen(pVnode, &cqCfg);
|
||||
|
||||
STsdbAppH appH = {0};
|
||||
appH.appH = (void *)pVnode;
|
||||
appH.walCallBack = vnodeWalCallback;
|
||||
appH.cqH = pVnode->cq;
|
||||
|
||||
sprintf(temp, "%s/tsdb", rootDir);
|
||||
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
|
||||
if (pVnode->tsdb == NULL) {
|
||||
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
|
||||
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
sprintf(temp, "%s/wal", rootDir);
|
||||
pVnode->wal = walOpen(temp, &pVnode->walCfg);
|
||||
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
||||
|
||||
SSyncInfo syncInfo;
|
||||
syncInfo.vgId = pVnode->vgId;
|
||||
|
@ -208,24 +230,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
syncInfo.notifyRole = vnodeNotifyRole;
|
||||
pVnode->sync = syncStart(&syncInfo);
|
||||
|
||||
// start continuous query
|
||||
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
|
||||
cqStart(pVnode->cq);
|
||||
|
||||
pVnode->events = NULL;
|
||||
pVnode->cq = NULL;
|
||||
|
||||
STsdbAppH appH = {0};
|
||||
appH.appH = (void *)pVnode;
|
||||
appH.walCallBack = vnodeWalCallback;
|
||||
|
||||
sprintf(temp, "%s/tsdb", rootDir);
|
||||
void *pTsdb = tsdbOpenRepo(temp, &appH);
|
||||
if (pTsdb == NULL) {
|
||||
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
|
||||
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pVnode->tsdb = pTsdb;
|
||||
|
||||
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
||||
|
||||
pVnode->status = TAOS_VN_STATUS_READY;
|
||||
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
|
||||
|
@ -350,10 +359,16 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
|
|||
pVnode->sync = NULL;
|
||||
}
|
||||
|
||||
tsdbCloseRepo(pVnode->tsdb);
|
||||
walClose(pVnode->wal);
|
||||
vnodeSaveVersion(pVnode);
|
||||
cqClose(pVnode->cq);
|
||||
pVnode->cq = NULL;
|
||||
|
||||
tsdbCloseRepo(pVnode->tsdb);
|
||||
pVnode->tsdb = NULL;
|
||||
|
||||
walClose(pVnode->wal);
|
||||
pVnode->wal = NULL;
|
||||
|
||||
vnodeSaveVersion(pVnode);
|
||||
vnodeRelease(pVnode);
|
||||
}
|
||||
|
||||
|
@ -377,6 +392,11 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) {
|
|||
static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
||||
SVnodeObj *pVnode = ahandle;
|
||||
pVnode->role = role;
|
||||
|
||||
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
|
||||
cqStart(pVnode->cq);
|
||||
else
|
||||
cqStop(pVnode->cq);
|
||||
}
|
||||
|
||||
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "vnode.h"
|
||||
#include "vnodeInt.h"
|
||||
#include "vnodeLog.h"
|
||||
#include "tcq.h"
|
||||
|
||||
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
|
||||
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
||||
|
@ -149,7 +150,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
|||
}
|
||||
|
||||
code = tsdbCreateTable(pVnode->tsdb, &tCfg);
|
||||
|
||||
tfree(pDestSchema);
|
||||
|
||||
dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code);
|
||||
|
|
Loading…
Reference in New Issue