first draft
This commit is contained in:
parent
11353abc18
commit
cdcb0daa8a
|
@ -14,5 +14,6 @@ ADD_SUBDIRECTORY(mnode)
|
||||||
ADD_SUBDIRECTORY(vnode)
|
ADD_SUBDIRECTORY(vnode)
|
||||||
ADD_SUBDIRECTORY(tsdb)
|
ADD_SUBDIRECTORY(tsdb)
|
||||||
ADD_SUBDIRECTORY(wal)
|
ADD_SUBDIRECTORY(wal)
|
||||||
|
ADD_SUBDIRECTORY(cq)
|
||||||
ADD_SUBDIRECTORY(dnode)
|
ADD_SUBDIRECTORY(dnode)
|
||||||
ADD_SUBDIRECTORY(connector/jdbc)
|
ADD_SUBDIRECTORY(connector/jdbc)
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||||
|
PROJECT(TDengine)
|
||||||
|
|
||||||
|
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||||
|
INCLUDE_DIRECTORIES(inc)
|
||||||
|
|
||||||
|
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
|
||||||
|
|
||||||
|
ADD_LIBRARY(tcq ${SRC})
|
||||||
|
TARGET_LINK_LIBRARIES(tcq tutil common taos)
|
||||||
|
|
||||||
|
ADD_SUBDIRECTORY(test)
|
||||||
|
|
|
@ -14,154 +14,266 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
|
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include "taosdef.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "vnode.h"
|
#include "tlog.h"
|
||||||
|
#include "twal.h"
|
||||||
|
#include "tcq.h"
|
||||||
|
#include "taos.h"
|
||||||
|
|
||||||
/* static TAOS *dbConn = NULL; */
|
#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);}
|
||||||
void vnodeCloseStreamCallback(void *param);
|
#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);}
|
||||||
|
#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);}
|
||||||
|
#define cPrint(...) {taosPrintLog("WAL ", 255, __VA_ARGS__);}
|
||||||
|
|
||||||
void cqOpen(void *param, void *tmrId) {
|
typedef struct {
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
int vgId;
|
||||||
SMeterObj *pObj;
|
char path[TSDB_FILENAME_LEN];
|
||||||
|
char user[TSDB_USER_LEN];
|
||||||
|
char pass[TSDB_PASSWORD_LEN];
|
||||||
|
FCqWrite cqWrite;
|
||||||
|
void *ahandle;
|
||||||
|
int num; // number of continuous streams
|
||||||
|
struct SCqObj *pHead;
|
||||||
|
void *dbConn;
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
} SCqContext;
|
||||||
|
|
||||||
if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return;
|
typedef struct SCqObj {
|
||||||
if (pVnode->meterList == NULL) return;
|
int sid; // table ID
|
||||||
|
int rowSize; // bytes of a row
|
||||||
|
char *sqlStr; // SQL string
|
||||||
|
int columns; // number of columns
|
||||||
|
SSchema *pSchema; // pointer to schema array
|
||||||
|
void *pStream;
|
||||||
|
struct SCqObj *next;
|
||||||
|
SCqContext *pContext;
|
||||||
|
} SCqObj;
|
||||||
|
|
||||||
taosTmrStopA(&pVnode->streamTimer);
|
int cqDebugFlag = 135;
|
||||||
pVnode->streamTimer = NULL;
|
|
||||||
|
|
||||||
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
|
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
|
||||||
pObj = pVnode->meterList[sid];
|
|
||||||
if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) continue;
|
|
||||||
|
|
||||||
dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql);
|
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
||||||
|
|
||||||
|
SCqContext *pContext = calloc(sizeof(SCqContext), 1);
|
||||||
|
if (pContext == NULL) return NULL;
|
||||||
|
|
||||||
if (pVnode->dbConn == NULL) {
|
strcpy(pContext->user, pCfg->user);
|
||||||
char db[64] = {0};
|
strcpy(pContext->pass, pCfg->pass);
|
||||||
char user[64] = {0};
|
strcpy(pContext->path, pCfg->path);
|
||||||
vnodeGetDBFromMeterId(pObj, db);
|
pContext->vgId = pCfg->vgId;
|
||||||
sprintf(user, "_%s", pVnode->cfg.acct);
|
pContext->cqWrite = pCfg->cqWrite;
|
||||||
pVnode->dbConn = taos_connect(NULL, user, tsInternalPass, db, 0);
|
pContext->ahandle = ahandle;
|
||||||
|
|
||||||
|
// open meta data file
|
||||||
|
|
||||||
|
// loop each record
|
||||||
|
while (1) {
|
||||||
|
SCqObj *pObj = calloc(sizeof(SCqObj), 1);
|
||||||
|
if (pObj == NULL) {
|
||||||
|
cError("vgId:%d, no memory", pContext->vgId);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->dbConn == NULL) {
|
pObj->next = pContext->pHead;
|
||||||
dError("vid:%d, failed to connect to mgmt node", pVnode->vnode);
|
pContext->pHead = pObj;
|
||||||
taosTmrReset(vnodeOpenStreams, 1000, param, vnodeTmrCtrl, &pVnode->streamTimer);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pObj->pStream == NULL) {
|
// assigne each field in SCqObj
|
||||||
pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj,
|
// pObj->sid =
|
||||||
vnodeCloseStreamCallback);
|
// strcpy(pObj->sqlStr, ?? );
|
||||||
if (pObj->pStream) pVnode->numOfStreams++;
|
// schema, columns
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_mutex_init(&pContext->mutex, NULL);
|
||||||
|
|
||||||
|
cTrace("vgId:%d, CQ is opened", pContext->vgId);
|
||||||
|
|
||||||
|
return pContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close all streams in a vnode
|
void cqClose(void *handle) {
|
||||||
void cqClose(SVnodeObj *pVnode) {
|
SCqContext *pContext = handle;
|
||||||
SMeterObj *pObj;
|
|
||||||
dPrint("vid:%d, stream is closed, old role %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole));
|
|
||||||
|
|
||||||
// stop stream computing
|
// stop all CQs
|
||||||
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
|
cqStop(pContext);
|
||||||
pObj = pVnode->meterList[sid];
|
|
||||||
if (pObj == NULL) continue;
|
// save the meta data
|
||||||
if (pObj->sqlLen > 0 && pObj->pStream) {
|
|
||||||
taos_close_stream(pObj->pStream);
|
// free all resources
|
||||||
pVnode->numOfStreams--;
|
SCqObj *pObj = pContext->pHead;
|
||||||
|
while (pObj) {
|
||||||
|
SCqObj *pTemp = pObj;
|
||||||
|
pObj = pObj->next;
|
||||||
|
free(pTemp);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_destroy(&pContext->mutex);
|
||||||
|
|
||||||
|
cTrace("vgId:%d, CQ is closed", pContext->vgId);
|
||||||
|
free(pContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
void cqStart(void *handle) {
|
||||||
|
SCqContext *pContext = handle;
|
||||||
|
cTrace("vgId:%d, start all CQs", pContext->vgId);
|
||||||
|
if (pContext->dbConn) return;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
|
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
|
||||||
|
if (pContext->dbConn) {
|
||||||
|
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
|
||||||
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
SCqObj *pObj = pContext->pHead;
|
||||||
|
while (pObj) {
|
||||||
|
int64_t lastKey = 0;
|
||||||
|
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||||
|
if (pObj->pStream) {
|
||||||
|
pContext->num++;
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->sid, pObj->sqlStr);
|
||||||
|
} else {
|
||||||
|
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->sqlStr);
|
||||||
}
|
}
|
||||||
|
pObj = pObj->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
void cqStop(void *handle) {
|
||||||
|
SCqContext *pContext = handle;
|
||||||
|
cTrace("vgId:%d, stop all CQs", pContext->vgId);
|
||||||
|
if (pContext->dbConn == NULL) return;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
|
SCqObj *pObj = pContext->pHead;
|
||||||
|
while (pObj) {
|
||||||
|
if (pObj->pStream) taos_close_stream(pObj->pStream);
|
||||||
pObj->pStream = NULL;
|
pObj->pStream = NULL;
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->sid, pObj->sqlStr);
|
||||||
|
|
||||||
|
pObj = pObj->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pContext->dbConn) taos_close(pContext->dbConn);
|
||||||
|
pContext->dbConn = NULL;
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
void cqCreate(SMeterObj *pObj) {
|
void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns) {
|
||||||
if (pObj->sqlLen <= 0) return;
|
SCqContext *pContext = handle;
|
||||||
|
|
||||||
SVnodeObj *pVnode = vnodeList + pObj->vnode;
|
SCqObj *pObj = calloc(sizeof(SCqObj), 1);
|
||||||
|
if (pObj == NULL) return;
|
||||||
|
|
||||||
if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return;
|
pObj->sid = sid;
|
||||||
if (pObj->pStream) return;
|
pObj->sqlStr = malloc(strlen(sqlStr)+1);
|
||||||
|
strcpy(pObj->sqlStr, sqlStr);
|
||||||
|
|
||||||
dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql);
|
pObj->columns = columns;
|
||||||
if (pVnode->dbConn == NULL) {
|
|
||||||
if (pVnode->streamTimer == NULL) taosTmrReset(vnodeOpenStreams, 1000, pVnode, vnodeTmrCtrl, &pVnode->streamTimer);
|
int size = sizeof(SSchema) * columns;
|
||||||
} else {
|
pObj->pSchema = malloc(size);
|
||||||
pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj,
|
memcpy(pObj->pSchema, pSchema, size);
|
||||||
vnodeCloseStreamCallback);
|
|
||||||
if (pObj->pStream) pVnode->numOfStreams++;
|
cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->sid, pObj->sqlStr);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
|
pObj->next = pContext->pHead;
|
||||||
|
pContext->pHead = pObj;
|
||||||
|
|
||||||
|
if (pContext->dbConn) {
|
||||||
|
int64_t lastKey = 0;
|
||||||
|
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||||
|
if (pObj->pStream) {
|
||||||
|
pContext->num++;
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->sid, pObj->sqlStr);
|
||||||
|
} else {
|
||||||
|
cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->sid, pObj->sqlStr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close only one stream
|
void cqDrop(void *handle, int sid) {
|
||||||
void cqDrop(SMeterObj *pObj) {
|
SCqContext *pContext = handle;
|
||||||
SVnodeObj *pVnode = vnodeList + pObj->vnode;
|
|
||||||
if (pObj->sqlLen <= 0) return;
|
|
||||||
|
|
||||||
if (pObj->pStream) {
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
taos_close_stream(pObj->pStream);
|
|
||||||
pVnode->numOfStreams--;
|
// locate the pObj;
|
||||||
|
SCqObj *prev = NULL;
|
||||||
|
SCqObj *pObj = pContext->pHead;
|
||||||
|
while (pObj) {
|
||||||
|
if (pObj->sid != sid) {
|
||||||
|
prev = pObj;
|
||||||
|
pObj = pObj->next;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove from the linked list
|
||||||
|
if (prev) {
|
||||||
|
prev->next = pObj->next;
|
||||||
|
} else {
|
||||||
|
pContext->pHead = pObj->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj->pStream = NULL;
|
if (pObj) {
|
||||||
if (pVnode->numOfStreams == 0) {
|
// update the meta data
|
||||||
taos_close(pVnode->dbConn);
|
|
||||||
pVnode->dbConn = NULL;
|
// free the resources associated
|
||||||
|
if (pObj->pStream) taos_close_stream(pObj->pStream);
|
||||||
|
pObj->pStream = NULL;
|
||||||
|
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->sid, pObj->sqlStr);
|
||||||
|
free(pObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("vid:%d sid:%d id:%d stream is removed", pObj->vnode, pObj->sid, pObj->meterId);
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
||||||
SMeterObj *pObj = (SMeterObj *)param;
|
SCqObj *pObj = (SCqObj *)param;
|
||||||
dTrace("vid:%d sid:%d id:%s, stream result is ready", pObj->vnode, pObj->sid, pObj->meterId);
|
SCqContext *pContext = pObj->pContext;
|
||||||
|
if (pObj->pStream == NULL) return;
|
||||||
|
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->sid, pObj->sqlStr);
|
||||||
|
|
||||||
// construct data
|
// construct data
|
||||||
int32_t contLen = pObj->bytesPerPoint;
|
int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize;
|
||||||
char * pTemp = calloc(1, sizeof(SSubmitMsg) + pObj->bytesPerPoint + sizeof(SVMsgHeader));
|
char *buffer = calloc(size, 1);
|
||||||
SSubmitMsg *pMsg = (SSubmitMsg *)(pTemp + sizeof(SVMsgHeader));
|
|
||||||
|
|
||||||
pMsg->numOfRows = htons(1);
|
SWalHead *pHead = (SWalHead *)buffer;
|
||||||
|
pHead->msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||||
|
pHead->len = size - sizeof(SWalHead);
|
||||||
|
|
||||||
|
SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead));
|
||||||
|
// to do: fill in the SSubmitMsg structure
|
||||||
|
pSubmit->numOfBlocks = 1;
|
||||||
|
|
||||||
char ncharBuf[TSDB_MAX_BYTES_PER_ROW] = {0};
|
|
||||||
|
|
||||||
int32_t offset = 0;
|
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
|
||||||
for (int32_t i = 0; i < pObj->numOfColumns; ++i) {
|
// to do: fill in the SSubmitBlk strucuture
|
||||||
char *dst = row[i];
|
pBlk->tid = pObj->sid;
|
||||||
if (dst == NULL) {
|
|
||||||
setNull(pMsg->payLoad + offset, pObj->schema[i].type, pObj->schema[i].bytes);
|
|
||||||
} else {
|
|
||||||
// here, we need to transfer nchar(utf8) to unicode(ucs-4)
|
|
||||||
if (pObj->schema[i].type == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
taosMbsToUcs4(row[i], pObj->schema[i].bytes, ncharBuf, TSDB_MAX_BYTES_PER_ROW);
|
|
||||||
dst = ncharBuf;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(pMsg->payLoad + offset, dst, pObj->schema[i].bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
offset += pObj->schema[i].bytes;
|
// write into vnode write queue
|
||||||
}
|
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ);
|
||||||
|
|
||||||
contLen += sizeof(SSubmitMsg);
|
|
||||||
|
|
||||||
int32_t numOfPoints = 0;
|
|
||||||
int32_t code = vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion,
|
|
||||||
&numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision));
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
dError("vid:%d sid:%d id:%s, failed to insert continuous query results", pObj->vnode, pObj->sid, pObj->meterId);
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(numOfPoints >= 0 && numOfPoints <= 1);
|
|
||||||
tfree(pTemp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeGetDBFromMeterId(SMeterObj *pObj, char *db) {
|
|
||||||
char *st = strstr(pObj->meterId, ".");
|
|
||||||
char *end = strstr(st + 1, ".");
|
|
||||||
|
|
||||||
memcpy(db, st + 1, end - (st + 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,209 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
|
||||||
#include "taosmsg.h"
|
|
||||||
#include "vnode.h"
|
|
||||||
#include "vnodeUtil.h"
|
|
||||||
#include "vnodeStatus.h"
|
|
||||||
|
|
||||||
/* static TAOS *dbConn = NULL; */
|
|
||||||
void vnodeCloseStreamCallback(void *param);
|
|
||||||
|
|
||||||
void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
|
||||||
SMeterObj *pObj = (SMeterObj *)param;
|
|
||||||
dTrace("vid:%d sid:%d id:%s, stream result is ready", pObj->vnode, pObj->sid, pObj->meterId);
|
|
||||||
|
|
||||||
// construct data
|
|
||||||
int32_t contLen = pObj->bytesPerPoint;
|
|
||||||
char * pTemp = calloc(1, sizeof(SSubmitMsg) + pObj->bytesPerPoint + sizeof(SVMsgHeader));
|
|
||||||
SSubmitMsg *pMsg = (SSubmitMsg *)(pTemp + sizeof(SVMsgHeader));
|
|
||||||
|
|
||||||
pMsg->numOfRows = htons(1);
|
|
||||||
|
|
||||||
char ncharBuf[TSDB_MAX_BYTES_PER_ROW] = {0};
|
|
||||||
|
|
||||||
int32_t offset = 0;
|
|
||||||
for (int32_t i = 0; i < pObj->numOfColumns; ++i) {
|
|
||||||
char *dst = row[i];
|
|
||||||
if (dst == NULL) {
|
|
||||||
setNull(pMsg->payLoad + offset, pObj->schema[i].type, pObj->schema[i].bytes);
|
|
||||||
} else {
|
|
||||||
// here, we need to transfer nchar(utf8) to unicode(ucs-4)
|
|
||||||
if (pObj->schema[i].type == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
taosMbsToUcs4(row[i], pObj->schema[i].bytes, ncharBuf, TSDB_MAX_BYTES_PER_ROW);
|
|
||||||
dst = ncharBuf;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(pMsg->payLoad + offset, dst, pObj->schema[i].bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
offset += pObj->schema[i].bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
contLen += sizeof(SSubmitMsg);
|
|
||||||
|
|
||||||
int32_t numOfPoints = 0;
|
|
||||||
int32_t code = vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion,
|
|
||||||
&numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision));
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
dError("vid:%d sid:%d id:%s, failed to insert continuous query results", pObj->vnode, pObj->sid, pObj->meterId);
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(numOfPoints >= 0 && numOfPoints <= 1);
|
|
||||||
tfree(pTemp);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void vnodeGetDBFromMeterId(SMeterObj *pObj, char *db) {
|
|
||||||
char *st = strstr(pObj->meterId, ".");
|
|
||||||
char *end = strstr(st + 1, ".");
|
|
||||||
|
|
||||||
memcpy(db, st + 1, end - (st + 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
void vnodeOpenStreams(void *param, void *tmrId) {
|
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
|
||||||
SMeterObj *pObj;
|
|
||||||
|
|
||||||
if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return;
|
|
||||||
if (pVnode->meterList == NULL) return;
|
|
||||||
|
|
||||||
taosTmrStopA(&pVnode->streamTimer);
|
|
||||||
pVnode->streamTimer = NULL;
|
|
||||||
|
|
||||||
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
|
|
||||||
pObj = pVnode->meterList[sid];
|
|
||||||
if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) continue;
|
|
||||||
|
|
||||||
dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql);
|
|
||||||
|
|
||||||
if (pVnode->dbConn == NULL) {
|
|
||||||
char db[64] = {0};
|
|
||||||
char user[64] = {0};
|
|
||||||
vnodeGetDBFromMeterId(pObj, db);
|
|
||||||
sprintf(user, "_%s", pVnode->cfg.acct);
|
|
||||||
pVnode->dbConn = taos_connect(NULL, user, tsInternalPass, db, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pVnode->dbConn == NULL) {
|
|
||||||
dError("vid:%d, failed to connect to mgmt node", pVnode->vnode);
|
|
||||||
taosTmrReset(vnodeOpenStreams, 1000, param, vnodeTmrCtrl, &pVnode->streamTimer);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pObj->pStream == NULL) {
|
|
||||||
pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj,
|
|
||||||
vnodeCloseStreamCallback);
|
|
||||||
if (pObj->pStream) pVnode->numOfStreams++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void vnodeCreateStream(SMeterObj *pObj) {
|
|
||||||
if (pObj->sqlLen <= 0) return;
|
|
||||||
|
|
||||||
SVnodeObj *pVnode = vnodeList + pObj->vnode;
|
|
||||||
|
|
||||||
if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return;
|
|
||||||
if (pObj->pStream) return;
|
|
||||||
|
|
||||||
dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql);
|
|
||||||
if (pVnode->dbConn == NULL) {
|
|
||||||
if (pVnode->streamTimer == NULL) taosTmrReset(vnodeOpenStreams, 1000, pVnode, vnodeTmrCtrl, &pVnode->streamTimer);
|
|
||||||
} else {
|
|
||||||
pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj,
|
|
||||||
vnodeCloseStreamCallback);
|
|
||||||
if (pObj->pStream) pVnode->numOfStreams++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close only one stream
|
|
||||||
void vnodeRemoveStream(SMeterObj *pObj) {
|
|
||||||
SVnodeObj *pVnode = vnodeList + pObj->vnode;
|
|
||||||
if (pObj->sqlLen <= 0) return;
|
|
||||||
|
|
||||||
if (pObj->pStream) {
|
|
||||||
taos_close_stream(pObj->pStream);
|
|
||||||
pVnode->numOfStreams--;
|
|
||||||
}
|
|
||||||
|
|
||||||
pObj->pStream = NULL;
|
|
||||||
if (pVnode->numOfStreams == 0) {
|
|
||||||
taos_close(pVnode->dbConn);
|
|
||||||
pVnode->dbConn = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
dTrace("vid:%d sid:%d id:%d stream is removed", pObj->vnode, pObj->sid, pObj->meterId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close all streams in a vnode
|
|
||||||
void vnodeCloseStream(SVnodeObj *pVnode) {
|
|
||||||
SMeterObj *pObj;
|
|
||||||
dPrint("vid:%d, stream is closed, old role %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole));
|
|
||||||
|
|
||||||
// stop stream computing
|
|
||||||
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
|
|
||||||
pObj = pVnode->meterList[sid];
|
|
||||||
if (pObj == NULL) continue;
|
|
||||||
if (pObj->sqlLen > 0 && pObj->pStream) {
|
|
||||||
taos_close_stream(pObj->pStream);
|
|
||||||
pVnode->numOfStreams--;
|
|
||||||
}
|
|
||||||
pObj->pStream = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void vnodeUpdateStreamRole(SVnodeObj *pVnode) {
|
|
||||||
/* SMeterObj *pObj; */
|
|
||||||
|
|
||||||
int newRole = (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) ? TSDB_VN_STREAM_STATUS_START : TSDB_VN_STREAM_STATUS_STOP;
|
|
||||||
if (newRole != pVnode->streamRole) {
|
|
||||||
dPrint("vid:%d, stream role is changed from %s to %s",
|
|
||||||
pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole), taosGetVnodeStreamStatusStr(newRole));
|
|
||||||
pVnode->streamRole = newRole;
|
|
||||||
if (newRole == TSDB_VN_STREAM_STATUS_START) {
|
|
||||||
vnodeOpenStreams(pVnode, NULL);
|
|
||||||
} else {
|
|
||||||
vnodeCloseStream(pVnode);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
dPrint("vid:%d, stream role is keep to %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Callback function called from client
|
|
||||||
void vnodeCloseStreamCallback(void *param) {
|
|
||||||
SMeterObj *pMeter = (SMeterObj *)param;
|
|
||||||
SVnodeObj *pVnode = NULL;
|
|
||||||
|
|
||||||
if (pMeter == NULL || pMeter->sqlLen == 0) return;
|
|
||||||
pVnode = vnodeList + pMeter->vnode;
|
|
||||||
|
|
||||||
pMeter->sqlLen = 0;
|
|
||||||
pMeter->pSql = NULL;
|
|
||||||
pMeter->pStream = NULL;
|
|
||||||
|
|
||||||
pVnode->numOfStreams--;
|
|
||||||
|
|
||||||
if (pVnode->numOfStreams == 0) {
|
|
||||||
taos_close(pVnode->dbConn);
|
|
||||||
pVnode->dbConn = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
vnodeSaveMeterObjToFile(pMeter);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||||
|
PROJECT(TDengine)
|
||||||
|
|
||||||
|
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
|
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
|
||||||
|
INCLUDE_DIRECTORIES(../inc)
|
||||||
|
|
||||||
|
LIST(APPEND CQTEST_SRC ./cqtest.c)
|
||||||
|
ADD_EXECUTABLE(cqtest ${CQTEST_SRC})
|
||||||
|
TARGET_LINK_LIBRARIES(cqtest tcq)
|
||||||
|
|
||||||
|
ENDIF ()
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
//#define _DEFAULT_SOURCE
|
||||||
|
#include "os.h"
|
||||||
|
#include "taosdef.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "tcq.h"
|
||||||
|
|
||||||
|
int64_t ver = 0;
|
||||||
|
void *pCq = NULL;
|
||||||
|
|
||||||
|
int writeToQueue(void *pVnode, void *data, int type) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
char path[128] = "~/cq";
|
||||||
|
|
||||||
|
for (int i=1; i<argc; ++i) {
|
||||||
|
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||||
|
strcpy(path, argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||||
|
ddebugFlag = atoi(argv[++i]);
|
||||||
|
} else {
|
||||||
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
|
printf(" [-p path]: wal file path default is:%s\n", path);
|
||||||
|
printf(" [-d debugFlag]: debug flag, default:%d\n", ddebugFlag);
|
||||||
|
printf(" [-h help]: print out this help\n\n");
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosInitLog("cq.log", 100000, 10);
|
||||||
|
|
||||||
|
SCqCfg cqCfg;
|
||||||
|
strcpy(cqCfg.user, "root");
|
||||||
|
strcpy(cqCfg.pass, "taosdata");
|
||||||
|
strcpy(cqCfg.path, path);
|
||||||
|
cqCfg.vgId = 2;
|
||||||
|
cqCfg.cqWrite = writeToQueue;
|
||||||
|
|
||||||
|
pCq = cqOpen(NULL, &cqCfg);
|
||||||
|
if (pCq == NULL) {
|
||||||
|
printf("failed to open CQ\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchema *pSchema = NULL;
|
||||||
|
for (int sid =1; sid<10; ++sid) {
|
||||||
|
cqCreate(pCq, 1, "select avg(speed) from t1 sliding(1s) interval(5s)", pSchema, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
char c = getchar();
|
||||||
|
|
||||||
|
switch(c) {
|
||||||
|
case 's':
|
||||||
|
cqStart(pCq);
|
||||||
|
break;
|
||||||
|
case 't':
|
||||||
|
cqStop(pCq);
|
||||||
|
break;
|
||||||
|
case 'c':
|
||||||
|
// create a CQ
|
||||||
|
break;
|
||||||
|
case 'd':
|
||||||
|
// drop a CQ
|
||||||
|
break;
|
||||||
|
case 'q':
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c=='q') break;
|
||||||
|
}
|
||||||
|
|
||||||
|
cqClose(pCq);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -335,6 +335,11 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
||||||
#define TSDB_PORT_MNODEDNODE 15
|
#define TSDB_PORT_MNODEDNODE 15
|
||||||
#define TSDB_PORT_SYNC 20
|
#define TSDB_PORT_SYNC 20
|
||||||
|
|
||||||
|
#define TAOS_QTYPE_RPC 0
|
||||||
|
#define TAOS_QTYPE_FWD 1
|
||||||
|
#define TAOS_QTYPE_WAL 2
|
||||||
|
#define TAOS_QTYPE_CQ 3
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_PRECISION_MILLI,
|
TSDB_PRECISION_MILLI,
|
||||||
TSDB_PRECISION_MICRO,
|
TSDB_PRECISION_MICRO,
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#ifndef _TD_CQ_H_
|
||||||
|
#define _TD_CQ_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
typedef int (*FCqWrite)(void *ahandle, void *pHead, int type);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int vgId;
|
||||||
|
char path[TSDB_FILENAME_LEN];
|
||||||
|
char user[TSDB_USER_LEN];
|
||||||
|
char pass[TSDB_PASSWORD_LEN];
|
||||||
|
FCqWrite cqWrite;
|
||||||
|
} SCqCfg;
|
||||||
|
|
||||||
|
void *cqOpen(void *ahandle, const SCqCfg *pCfg);
|
||||||
|
void cqClose(void *handle);
|
||||||
|
void cqStart(void *handle);
|
||||||
|
void cqStop(void *handle);
|
||||||
|
void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns);
|
||||||
|
void cqDrop(void *handle, int sid);
|
||||||
|
|
||||||
|
extern int cqDebugFlag;
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // _TD_CQ_H_
|
|
@ -20,10 +20,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define TAOS_QTYPE_RPC 0
|
|
||||||
#define TAOS_QTYPE_FWD 1
|
|
||||||
#define TAOS_QTYPE_WAL 2
|
|
||||||
|
|
||||||
typedef void* taos_queue;
|
typedef void* taos_queue;
|
||||||
typedef void* taos_qset;
|
typedef void* taos_qset;
|
||||||
typedef void* taos_qall;
|
typedef void* taos_qall;
|
||||||
|
|
|
@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
AUX_SOURCE_DIRECTORY(src SRC)
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
|
|
||||||
ADD_LIBRARY(vnode ${SRC})
|
ADD_LIBRARY(vnode ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(vnode tsdb)
|
TARGET_LINK_LIBRARIES(vnode tsdb tcq)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
|
@ -30,6 +30,8 @@
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
#include "vnodeLog.h"
|
#include "vnodeLog.h"
|
||||||
|
#include "tcq.h"
|
||||||
|
//#include "tsync.h"
|
||||||
|
|
||||||
static int32_t tsOpennedVnodes;
|
static int32_t tsOpennedVnodes;
|
||||||
static void *tsDnodeVnodesHash;
|
static void *tsDnodeVnodesHash;
|
||||||
|
@ -192,8 +194,27 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
||||||
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
||||||
|
|
||||||
|
STsdbAppH appH = {0};
|
||||||
|
appH.appH = (void *)pVnode;
|
||||||
|
appH.walCallBack = vnodeWalCallback;
|
||||||
|
|
||||||
|
sprintf(temp, "%s/tsdb", rootDir);
|
||||||
|
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
|
||||||
|
if (pVnode->tsdb == NULL) {
|
||||||
|
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
|
||||||
|
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
sprintf(temp, "%s/wal", rootDir);
|
sprintf(temp, "%s/wal", rootDir);
|
||||||
pVnode->wal = walOpen(temp, &pVnode->walCfg);
|
pVnode->wal = walOpen(temp, &pVnode->walCfg);
|
||||||
|
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
||||||
|
|
||||||
|
SCqCfg cqCfg;
|
||||||
|
sprintf(cqCfg.path, "%s/cq", rootDir);
|
||||||
|
strcpy(cqCfg.pass, tsInternalPass);
|
||||||
|
cqCfg.cqWrite = vnodeWriteToQueue;
|
||||||
|
pVnode->cq = cqOpen(pVnode, &cqCfg);
|
||||||
|
|
||||||
SSyncInfo syncInfo;
|
SSyncInfo syncInfo;
|
||||||
syncInfo.vgId = pVnode->vgId;
|
syncInfo.vgId = pVnode->vgId;
|
||||||
|
@ -208,24 +229,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
syncInfo.notifyRole = vnodeNotifyRole;
|
syncInfo.notifyRole = vnodeNotifyRole;
|
||||||
pVnode->sync = syncStart(&syncInfo);
|
pVnode->sync = syncStart(&syncInfo);
|
||||||
|
|
||||||
|
// start continuous query
|
||||||
|
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
|
||||||
|
cqStart(pVnode->cq);
|
||||||
|
|
||||||
pVnode->events = NULL;
|
pVnode->events = NULL;
|
||||||
pVnode->cq = NULL;
|
|
||||||
|
|
||||||
STsdbAppH appH = {0};
|
|
||||||
appH.appH = (void *)pVnode;
|
|
||||||
appH.walCallBack = vnodeWalCallback;
|
|
||||||
|
|
||||||
sprintf(temp, "%s/tsdb", rootDir);
|
|
||||||
void *pTsdb = tsdbOpenRepo(temp, &appH);
|
|
||||||
if (pTsdb == NULL) {
|
|
||||||
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
|
|
||||||
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
pVnode->tsdb = pTsdb;
|
|
||||||
|
|
||||||
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
|
||||||
|
|
||||||
pVnode->status = TAOS_VN_STATUS_READY;
|
pVnode->status = TAOS_VN_STATUS_READY;
|
||||||
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
|
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
|
||||||
|
@ -350,10 +358,16 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
|
||||||
pVnode->sync = NULL;
|
pVnode->sync = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbCloseRepo(pVnode->tsdb);
|
cqClose(pVnode->cq);
|
||||||
walClose(pVnode->wal);
|
pVnode->cq = NULL;
|
||||||
vnodeSaveVersion(pVnode);
|
|
||||||
|
|
||||||
|
tsdbCloseRepo(pVnode->tsdb);
|
||||||
|
pVnode->tsdb = NULL;
|
||||||
|
|
||||||
|
walClose(pVnode->wal);
|
||||||
|
pVnode->wal = NULL;
|
||||||
|
|
||||||
|
vnodeSaveVersion(pVnode);
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,6 +391,11 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) {
|
||||||
static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
||||||
SVnodeObj *pVnode = ahandle;
|
SVnodeObj *pVnode = ahandle;
|
||||||
pVnode->role = role;
|
pVnode->role = role;
|
||||||
|
|
||||||
|
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
|
||||||
|
cqStart(pVnode->cq);
|
||||||
|
else
|
||||||
|
cqStop(pVnode->cq);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
#include "vnodeLog.h"
|
#include "vnodeLog.h"
|
||||||
|
#include "tcq.h"
|
||||||
|
|
||||||
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
|
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
|
||||||
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
||||||
|
@ -113,6 +114,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
||||||
int16_t numOfColumns = htons(pTable->numOfColumns);
|
int16_t numOfColumns = htons(pTable->numOfColumns);
|
||||||
int16_t numOfTags = htons(pTable->numOfTags);
|
int16_t numOfTags = htons(pTable->numOfTags);
|
||||||
int32_t sid = htonl(pTable->sid);
|
int32_t sid = htonl(pTable->sid);
|
||||||
|
int32_t sqlDataLen = htonl(pTable->sqlDataLen);
|
||||||
uint64_t uid = htobe64(pTable->uid);
|
uint64_t uid = htobe64(pTable->uid);
|
||||||
SSchema *pSchema = (SSchema *) pTable->data;
|
SSchema *pSchema = (SSchema *) pTable->data;
|
||||||
|
|
||||||
|
@ -150,6 +152,13 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
||||||
|
|
||||||
code = tsdbCreateTable(pVnode->tsdb, &tCfg);
|
code = tsdbCreateTable(pVnode->tsdb, &tCfg);
|
||||||
|
|
||||||
|
if (code == 0 && sqlDataLen >0) {
|
||||||
|
char *sqlStr = NULL;
|
||||||
|
// to do: get the sqlStr
|
||||||
|
|
||||||
|
cqCreate(pVnode->cq, sid, sqlStr, pSchema, numOfColumns);
|
||||||
|
}
|
||||||
|
|
||||||
tfree(pDestSchema);
|
tfree(pDestSchema);
|
||||||
|
|
||||||
dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code);
|
dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code);
|
||||||
|
@ -167,6 +176,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
|
||||||
};
|
};
|
||||||
|
|
||||||
code = tsdbDropTable(pVnode->tsdb, tableId);
|
code = tsdbDropTable(pVnode->tsdb, tableId);
|
||||||
|
cqDrop(pVnode->cq, tableId.tid);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue