[td-168] merge develop
This commit is contained in:
commit
c7215cf1c8
|
@ -14,5 +14,6 @@ ADD_SUBDIRECTORY(mnode)
|
||||||
ADD_SUBDIRECTORY(vnode)
|
ADD_SUBDIRECTORY(vnode)
|
||||||
ADD_SUBDIRECTORY(tsdb)
|
ADD_SUBDIRECTORY(tsdb)
|
||||||
ADD_SUBDIRECTORY(wal)
|
ADD_SUBDIRECTORY(wal)
|
||||||
|
ADD_SUBDIRECTORY(cq)
|
||||||
ADD_SUBDIRECTORY(dnode)
|
ADD_SUBDIRECTORY(dnode)
|
||||||
ADD_SUBDIRECTORY(connector/jdbc)
|
ADD_SUBDIRECTORY(connector/jdbc)
|
||||||
|
|
|
@ -229,6 +229,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||||
}
|
}
|
||||||
} else if (pInfo->type == TSDB_SQL_DROP_DNODE) {
|
} else if (pInfo->type == TSDB_SQL_DROP_DNODE) {
|
||||||
|
pzName->n = strdequote(pzName->z);
|
||||||
strncpy(pTableMetaInfo->name, pzName->z, pzName->n);
|
strncpy(pTableMetaInfo->name, pzName->z, pzName->n);
|
||||||
} else { // drop user
|
} else { // drop user
|
||||||
if (pzName->n > TSDB_USER_LEN) {
|
if (pzName->n > TSDB_USER_LEN) {
|
||||||
|
|
|
@ -140,7 +140,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
|
||||||
// offset of cmd in SSqlObj structure
|
// offset of cmd in SSqlObj structure
|
||||||
char *pSqlObjAddr = (char *)pCmd - offsetof(SSqlObj, cmd);
|
char *pSqlObjAddr = (char *)pCmd - offsetof(SSqlObj, cmd);
|
||||||
|
|
||||||
if (pMemBuffer == NULL || pDesc->pColumnModel == NULL) {
|
if (pMemBuffer == NULL) {
|
||||||
|
tscError("%p pMemBuffer", pMemBuffer);
|
||||||
|
pRes->code = TSDB_CODE_APP_ERROR;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDesc->pColumnModel == NULL) {
|
||||||
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
|
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
|
||||||
|
|
||||||
tscError("%p no local buffer or intermediate result format model", pSqlObjAddr);
|
tscError("%p no local buffer or intermediate result format model", pSqlObjAddr);
|
||||||
|
|
|
@ -231,7 +231,11 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
|
|
||||||
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
||||||
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
|
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
if (pSql == NULL) {
|
||||||
|
tscError("%p sql is already released", pSql->signature);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (pSql->signature != pSql) {
|
||||||
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
|
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -313,7 +317,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
||||||
pRes->rspType = rpcMsg->msgType;
|
pRes->rspType = rpcMsg->msgType;
|
||||||
pRes->rspLen = rpcMsg->contLen;
|
pRes->rspLen = rpcMsg->contLen;
|
||||||
|
|
||||||
if (pRes->rspLen > 0) {
|
if (pRes->rspLen > 0 && rpcMsg->pCont) {
|
||||||
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
|
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -220,8 +220,9 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
|
||||||
void taos_close(TAOS *taos) {
|
void taos_close(TAOS *taos) {
|
||||||
STscObj *pObj = (STscObj *)taos;
|
STscObj *pObj = (STscObj *)taos;
|
||||||
|
|
||||||
if (pObj == NULL) return;
|
if (pObj == NULL || pObj->signature != pObj) {
|
||||||
if (pObj->signature != pObj) return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (pObj->pHb != NULL) {
|
if (pObj->pHb != NULL) {
|
||||||
tscSetFreeHeatBeat(pObj);
|
tscSetFreeHeatBeat(pObj);
|
||||||
|
|
|
@ -172,17 +172,17 @@ static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql) {
|
||||||
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) {
|
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) {
|
||||||
SSqlStream * pStream = (SSqlStream *)param;
|
SSqlStream * pStream = (SSqlStream *)param;
|
||||||
SSqlObj * pSql = (SSqlObj *)res;
|
SSqlObj * pSql = (SSqlObj *)res;
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
|
||||||
|
|
||||||
if (pSql == NULL || numOfRows < 0) {
|
if (pSql == NULL || numOfRows < 0) {
|
||||||
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
|
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
|
||||||
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
|
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
|
||||||
tscClearTableMetaInfo(pTableMetaInfo, true);
|
|
||||||
|
|
||||||
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
|
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
||||||
|
|
||||||
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
|
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
|
||||||
pStream->numOfRes += numOfRows;
|
pStream->numOfRes += numOfRows;
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||||
|
|
|
@ -710,18 +710,20 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscCloseTscObj(STscObj* pObj) {
|
void tscCloseTscObj(STscObj* pObj) {
|
||||||
|
assert(pObj != NULL);
|
||||||
|
|
||||||
pObj->signature = NULL;
|
pObj->signature = NULL;
|
||||||
SSqlObj* pSql = pObj->pSql;
|
SSqlObj* pSql = pObj->pSql;
|
||||||
|
|
||||||
if (pSql) {
|
if (pSql) {
|
||||||
terrno = pSql->res.code;
|
terrno = pSql->res.code;
|
||||||
|
sem_destroy(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTmrStopA(&(pObj->pTimer));
|
taosTmrStopA(&(pObj->pTimer));
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
|
|
||||||
sem_destroy(&pSql->rspSem);
|
|
||||||
rpcClose(pObj->pMgmtConn);
|
rpcClose(pObj->pMgmtConn);
|
||||||
|
|
||||||
pthread_mutex_destroy(&pObj->mutex);
|
pthread_mutex_destroy(&pObj->mutex);
|
||||||
|
|
||||||
tscTrace("%p DB connection is closed", pObj);
|
tscTrace("%p DB connection is closed", pObj);
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||||
|
PROJECT(TDengine)
|
||||||
|
|
||||||
|
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
|
||||||
|
INCLUDE_DIRECTORIES(inc)
|
||||||
|
|
||||||
|
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
|
||||||
|
|
||||||
|
ADD_LIBRARY(tcq ${SRC})
|
||||||
|
TARGET_LINK_LIBRARIES(tcq tutil common taos)
|
||||||
|
|
||||||
|
ADD_SUBDIRECTORY(test)
|
||||||
|
|
|
@ -0,0 +1,249 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include "taosdef.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "twal.h"
|
||||||
|
#include "tcq.h"
|
||||||
|
#include "taos.h"
|
||||||
|
|
||||||
|
#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);}
|
||||||
|
#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);}
|
||||||
|
#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);}
|
||||||
|
#define cPrint(...) {taosPrintLog("WAL ", 255, __VA_ARGS__);}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int vgId;
|
||||||
|
char user[TSDB_USER_LEN];
|
||||||
|
char pass[TSDB_PASSWORD_LEN];
|
||||||
|
FCqWrite cqWrite;
|
||||||
|
void *ahandle;
|
||||||
|
int num; // number of continuous streams
|
||||||
|
struct SCqObj *pHead;
|
||||||
|
void *dbConn;
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
} SCqContext;
|
||||||
|
|
||||||
|
typedef struct SCqObj {
|
||||||
|
int tid; // table ID
|
||||||
|
int rowSize; // bytes of a row
|
||||||
|
char *sqlStr; // SQL string
|
||||||
|
int columns; // number of columns
|
||||||
|
SSchema *pSchema; // pointer to schema array
|
||||||
|
void *pStream;
|
||||||
|
struct SCqObj *prev;
|
||||||
|
struct SCqObj *next;
|
||||||
|
SCqContext *pContext;
|
||||||
|
} SCqObj;
|
||||||
|
|
||||||
|
int cqDebugFlag = 135;
|
||||||
|
|
||||||
|
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
|
||||||
|
|
||||||
|
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
||||||
|
|
||||||
|
SCqContext *pContext = calloc(sizeof(SCqContext), 1);
|
||||||
|
if (pContext == NULL) return NULL;
|
||||||
|
|
||||||
|
strcpy(pContext->user, pCfg->user);
|
||||||
|
strcpy(pContext->pass, pCfg->pass);
|
||||||
|
pContext->vgId = pCfg->vgId;
|
||||||
|
pContext->cqWrite = pCfg->cqWrite;
|
||||||
|
pContext->ahandle = ahandle;
|
||||||
|
|
||||||
|
pthread_mutex_init(&pContext->mutex, NULL);
|
||||||
|
|
||||||
|
cTrace("vgId:%d, CQ is opened", pContext->vgId);
|
||||||
|
|
||||||
|
return pContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
void cqClose(void *handle) {
|
||||||
|
SCqContext *pContext = handle;
|
||||||
|
|
||||||
|
// stop all CQs
|
||||||
|
cqStop(pContext);
|
||||||
|
|
||||||
|
// free all resources
|
||||||
|
SCqObj *pObj = pContext->pHead;
|
||||||
|
while (pObj) {
|
||||||
|
SCqObj *pTemp = pObj;
|
||||||
|
pObj = pObj->next;
|
||||||
|
free(pTemp);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_destroy(&pContext->mutex);
|
||||||
|
|
||||||
|
cTrace("vgId:%d, CQ is closed", pContext->vgId);
|
||||||
|
free(pContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
void cqStart(void *handle) {
|
||||||
|
SCqContext *pContext = handle;
|
||||||
|
cTrace("vgId:%d, start all CQs", pContext->vgId);
|
||||||
|
if (pContext->dbConn) return;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
|
tscEmbedded = 1;
|
||||||
|
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
|
||||||
|
if (pContext->dbConn == NULL) {
|
||||||
|
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
|
||||||
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCqObj *pObj = pContext->pHead;
|
||||||
|
while (pObj) {
|
||||||
|
int64_t lastKey = 0;
|
||||||
|
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||||
|
if (pObj->pStream) {
|
||||||
|
pContext->num++;
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
} else {
|
||||||
|
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
}
|
||||||
|
pObj = pObj->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
void cqStop(void *handle) {
|
||||||
|
SCqContext *pContext = handle;
|
||||||
|
cTrace("vgId:%d, stop all CQs", pContext->vgId);
|
||||||
|
if (pContext->dbConn == NULL) return;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
|
SCqObj *pObj = pContext->pHead;
|
||||||
|
while (pObj) {
|
||||||
|
if (pObj->pStream) {
|
||||||
|
taos_close_stream(pObj->pStream);
|
||||||
|
pObj->pStream = NULL;
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
pObj = pObj->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pContext->dbConn) taos_close(pContext->dbConn);
|
||||||
|
pContext->dbConn = NULL;
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int columns) {
|
||||||
|
SCqContext *pContext = handle;
|
||||||
|
|
||||||
|
SCqObj *pObj = calloc(sizeof(SCqObj), 1);
|
||||||
|
if (pObj == NULL) return NULL;
|
||||||
|
|
||||||
|
pObj->tid = tid;
|
||||||
|
pObj->sqlStr = malloc(strlen(sqlStr)+1);
|
||||||
|
strcpy(pObj->sqlStr, sqlStr);
|
||||||
|
|
||||||
|
pObj->columns = columns;
|
||||||
|
|
||||||
|
int size = sizeof(SSchema) * columns;
|
||||||
|
pObj->pSchema = malloc(size);
|
||||||
|
memcpy(pObj->pSchema, pSchema, size);
|
||||||
|
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
|
pObj->next = pContext->pHead;
|
||||||
|
if (pContext->pHead) pContext->pHead->prev = pObj;
|
||||||
|
pContext->pHead = pObj;
|
||||||
|
|
||||||
|
if (pContext->dbConn) {
|
||||||
|
int64_t lastKey = 0;
|
||||||
|
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||||
|
if (pObj->pStream) {
|
||||||
|
pContext->num++;
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
} else {
|
||||||
|
cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
|
|
||||||
|
return pObj;
|
||||||
|
}
|
||||||
|
|
||||||
|
void cqDrop(void *handle) {
|
||||||
|
SCqObj *pObj = handle;
|
||||||
|
SCqContext *pContext = pObj->pContext;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
|
if (pObj->prev) {
|
||||||
|
pObj->prev->next = pObj->next;
|
||||||
|
} else {
|
||||||
|
pContext->pHead = pObj->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pObj->next) {
|
||||||
|
pObj->next->prev = pObj->prev;
|
||||||
|
}
|
||||||
|
|
||||||
|
// free the resources associated
|
||||||
|
if (pObj->pStream) taos_close_stream(pObj->pStream);
|
||||||
|
pObj->pStream = NULL;
|
||||||
|
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
free(pObj);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
||||||
|
SCqObj *pObj = (SCqObj *)param;
|
||||||
|
SCqContext *pContext = pObj->pContext;
|
||||||
|
if (pObj->pStream == NULL) return;
|
||||||
|
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
|
||||||
|
// construct data
|
||||||
|
int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize;
|
||||||
|
char *buffer = calloc(size, 1);
|
||||||
|
|
||||||
|
SWalHead *pHead = (SWalHead *)buffer;
|
||||||
|
pHead->msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||||
|
pHead->len = size - sizeof(SWalHead);
|
||||||
|
|
||||||
|
SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead));
|
||||||
|
// to do: fill in the SSubmitMsg structure
|
||||||
|
pSubmit->numOfBlocks = 1;
|
||||||
|
|
||||||
|
|
||||||
|
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
|
||||||
|
// to do: fill in the SSubmitBlk strucuture
|
||||||
|
pBlk->tid = pObj->tid;
|
||||||
|
|
||||||
|
|
||||||
|
// write into vnode write queue
|
||||||
|
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ);
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||||
|
PROJECT(TDengine)
|
||||||
|
|
||||||
|
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
|
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
|
||||||
|
INCLUDE_DIRECTORIES(../inc)
|
||||||
|
|
||||||
|
LIST(APPEND CQTEST_SRC ./cqtest.c)
|
||||||
|
ADD_EXECUTABLE(cqtest ${CQTEST_SRC})
|
||||||
|
TARGET_LINK_LIBRARIES(cqtest tcq)
|
||||||
|
|
||||||
|
ENDIF ()
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,107 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
//#define _DEFAULT_SOURCE
|
||||||
|
#include "os.h"
|
||||||
|
#include "taosdef.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "tcq.h"
|
||||||
|
|
||||||
|
int64_t ver = 0;
|
||||||
|
void *pCq = NULL;
|
||||||
|
|
||||||
|
int writeToQueue(void *pVnode, void *data, int type) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
int num = 3;
|
||||||
|
|
||||||
|
for (int i=1; i<argc; ++i) {
|
||||||
|
if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||||
|
ddebugFlag = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-n") == 0 && i <argc-1) {
|
||||||
|
num = atoi(argv[++i]);
|
||||||
|
} else {
|
||||||
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
|
printf(" [-n num]: number of streams, default:%d\n", num);
|
||||||
|
printf(" [-d debugFlag]: debug flag, default:%d\n", ddebugFlag);
|
||||||
|
printf(" [-h help]: print out this help\n\n");
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosInitLog("cq.log", 100000, 10);
|
||||||
|
|
||||||
|
SCqCfg cqCfg;
|
||||||
|
strcpy(cqCfg.user, "root");
|
||||||
|
strcpy(cqCfg.pass, "taosdata");
|
||||||
|
cqCfg.vgId = 2;
|
||||||
|
cqCfg.cqWrite = writeToQueue;
|
||||||
|
|
||||||
|
pCq = cqOpen(NULL, &cqCfg);
|
||||||
|
if (pCq == NULL) {
|
||||||
|
printf("failed to open CQ\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchema schema[2];
|
||||||
|
schema[0].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
strcpy(schema[0].name, "ts");
|
||||||
|
schema[0].colId = 0;
|
||||||
|
schema[0].bytes = 8;
|
||||||
|
|
||||||
|
schema[1].type = TSDB_DATA_TYPE_INT;
|
||||||
|
strcpy(schema[1].name, "avgspeed");
|
||||||
|
schema[1].colId = 1;
|
||||||
|
schema[1].bytes = 4;
|
||||||
|
|
||||||
|
for (int sid =1; sid<10; ++sid) {
|
||||||
|
cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", schema, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
char c = getchar();
|
||||||
|
|
||||||
|
switch(c) {
|
||||||
|
case 's':
|
||||||
|
cqStart(pCq);
|
||||||
|
break;
|
||||||
|
case 't':
|
||||||
|
cqStop(pCq);
|
||||||
|
break;
|
||||||
|
case 'c':
|
||||||
|
// create a CQ
|
||||||
|
break;
|
||||||
|
case 'd':
|
||||||
|
// drop a CQ
|
||||||
|
break;
|
||||||
|
case 'q':
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
printf("invalid command:%c", c);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c=='q') break;
|
||||||
|
}
|
||||||
|
|
||||||
|
cqClose(pCq);
|
||||||
|
|
||||||
|
taosCloseLog();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -213,6 +213,7 @@ static void dnodeCheckDataDirOpenned(char *dir) {
|
||||||
int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
|
int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
dError("failed to lock file:%s ret:%d, database may be running, quit", filepath, ret);
|
dError("failed to lock file:%s ret:%d, database may be running, quit", filepath, ret);
|
||||||
|
close(fd);
|
||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -338,6 +338,11 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
||||||
#define TSDB_PORT_MNODEDNODE 15
|
#define TSDB_PORT_MNODEDNODE 15
|
||||||
#define TSDB_PORT_SYNC 20
|
#define TSDB_PORT_SYNC 20
|
||||||
|
|
||||||
|
#define TAOS_QTYPE_RPC 0
|
||||||
|
#define TAOS_QTYPE_FWD 1
|
||||||
|
#define TAOS_QTYPE_WAL 2
|
||||||
|
#define TAOS_QTYPE_CQ 3
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_PRECISION_MILLI,
|
TSDB_PRECISION_MILLI,
|
||||||
TSDB_PRECISION_MICRO,
|
TSDB_PRECISION_MICRO,
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#ifndef _TD_CQ_H_
|
||||||
|
#define _TD_CQ_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
typedef int (*FCqWrite)(void *ahandle, void *pHead, int type);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int vgId;
|
||||||
|
char user[TSDB_USER_LEN];
|
||||||
|
char pass[TSDB_PASSWORD_LEN];
|
||||||
|
FCqWrite cqWrite;
|
||||||
|
} SCqCfg;
|
||||||
|
|
||||||
|
// the following API shall be called by vnode
|
||||||
|
void *cqOpen(void *ahandle, const SCqCfg *pCfg);
|
||||||
|
void cqClose(void *handle);
|
||||||
|
|
||||||
|
// if vnode is master, vnode call this API to start CQ
|
||||||
|
void cqStart(void *handle);
|
||||||
|
|
||||||
|
// if vnode is slave/unsynced, vnode shall call this API to stop CQ
|
||||||
|
void cqStop(void *handle);
|
||||||
|
|
||||||
|
// cqCreate is called by TSDB to start an instance of CQ
|
||||||
|
void *cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns);
|
||||||
|
|
||||||
|
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
|
||||||
|
void cqDrop(void *handle);
|
||||||
|
|
||||||
|
extern int cqDebugFlag;
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // _TD_CQ_H_
|
|
@ -38,9 +38,9 @@ extern "C" {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
// WAL handle
|
// WAL handle
|
||||||
void *appH;
|
void *appH;
|
||||||
|
void *cqH;
|
||||||
int (*walCallBack)(void *);
|
int (*walCallBack)(void *);
|
||||||
int (*eventCallBack)(void *);
|
int (*eventCallBack)(void *);
|
||||||
int (*cqueryCallBack)(void *);
|
|
||||||
} STsdbAppH;
|
} STsdbAppH;
|
||||||
|
|
||||||
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
|
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
|
||||||
|
|
|
@ -142,6 +142,7 @@ static void shellSourceFile(TAOS *con, char *fptr) {
|
||||||
|
|
||||||
if (wordexp(fptr, &full_path, 0) != 0) {
|
if (wordexp(fptr, &full_path, 0) != 0) {
|
||||||
fprintf(stderr, "ERROR: illegal file name\n");
|
fprintf(stderr, "ERROR: illegal file name\n");
|
||||||
|
free(cmd);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,13 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
if (arg) arguments->password = arg;
|
if (arg) arguments->password = arg;
|
||||||
break;
|
break;
|
||||||
case 'P':
|
case 'P':
|
||||||
|
if (arg) {
|
||||||
tsMnodeShellPort = atoi(arg);
|
tsMnodeShellPort = atoi(arg);
|
||||||
|
} else {
|
||||||
|
fprintf(stderr, "Invalid port\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
case 't':
|
case 't':
|
||||||
arguments->timezone = arg;
|
arguments->timezone = arg;
|
||||||
|
@ -101,7 +107,12 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
wordfree(&full_path);
|
wordfree(&full_path);
|
||||||
break;
|
break;
|
||||||
case 'T':
|
case 'T':
|
||||||
|
if (arg) {
|
||||||
arguments->threadNum = atoi(arg);
|
arguments->threadNum = atoi(arg);
|
||||||
|
} else {
|
||||||
|
fprintf(stderr, "Invalid number of threads\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case 'd':
|
case 'd':
|
||||||
arguments->database = arg;
|
arguments->database = arg;
|
||||||
|
|
|
@ -340,6 +340,9 @@ int main(int argc, char *argv[]) {
|
||||||
int count_data_type = 0;
|
int count_data_type = 0;
|
||||||
char dataString[512];
|
char dataString[512];
|
||||||
bool do_aggreFunc = true;
|
bool do_aggreFunc = true;
|
||||||
|
|
||||||
|
memset(dataString, 0, 512);
|
||||||
|
|
||||||
if (strcasecmp(data_type[0], "BINARY") == 0 || strcasecmp(data_type[0], "BOOL") == 0) {
|
if (strcasecmp(data_type[0], "BINARY") == 0 || strcasecmp(data_type[0], "BOOL") == 0) {
|
||||||
do_aggreFunc = false;
|
do_aggreFunc = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -383,14 +383,13 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) {
|
||||||
|
|
||||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||||
|
|
||||||
while ((row = taos_fetch_row(result)) != NULL) {
|
if ((row = taos_fetch_row(result)) != NULL) {
|
||||||
isSet = true;
|
isSet = true;
|
||||||
pTableRecordInfo->isMetric = false;
|
pTableRecordInfo->isMetric = false;
|
||||||
strncpy(pTableRecordInfo->tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
|
strncpy(pTableRecordInfo->tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
|
||||||
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
|
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
|
||||||
strncpy(pTableRecordInfo->tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
|
strncpy(pTableRecordInfo->tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
|
||||||
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
|
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(result);
|
taos_free_result(result);
|
||||||
|
@ -410,11 +409,10 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
while ((row = taos_fetch_row(result)) != NULL) {
|
if ((row = taos_fetch_row(result)) != NULL) {
|
||||||
isSet = true;
|
isSet = true;
|
||||||
pTableRecordInfo->isMetric = true;
|
pTableRecordInfo->isMetric = true;
|
||||||
strcpy(pTableRecordInfo->tableRecord.metric, table);
|
strcpy(pTableRecordInfo->tableRecord.metric, table);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(result);
|
taos_free_result(result);
|
||||||
|
|
|
@ -38,7 +38,7 @@ void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode);
|
||||||
void mgmtIncDnodeRef(SDnodeObj *pDnode);
|
void mgmtIncDnodeRef(SDnodeObj *pDnode);
|
||||||
void mgmtDecDnodeRef(SDnodeObj *pDnode);
|
void mgmtDecDnodeRef(SDnodeObj *pDnode);
|
||||||
void * mgmtGetDnode(int32_t dnodeId);
|
void * mgmtGetDnode(int32_t dnodeId);
|
||||||
void * mgmtGetDnodeByIp(char *ep);
|
void * mgmtGetDnodeByEp(char *ep);
|
||||||
void mgmtUpdateDnode(SDnodeObj *pDnode);
|
void mgmtUpdateDnode(SDnodeObj *pDnode);
|
||||||
int32_t mgmtDropDnode(SDnodeObj *pDnode);
|
int32_t mgmtDropDnode(SDnodeObj *pDnode);
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@ void mgmtCleanUpVgroups();
|
||||||
SVgObj *mgmtGetVgroup(int32_t vgId);
|
SVgObj *mgmtGetVgroup(int32_t vgId);
|
||||||
void mgmtIncVgroupRef(SVgObj *pVgroup);
|
void mgmtIncVgroupRef(SVgObj *pVgroup);
|
||||||
void mgmtDecVgroupRef(SVgObj *pVgroup);
|
void mgmtDecVgroupRef(SVgObj *pVgroup);
|
||||||
void mgmtDropAllDbVgroups(SDbObj *pDropDb);
|
void mgmtDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg);
|
||||||
void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode);
|
void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode);
|
||||||
|
|
||||||
void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup);
|
void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup);
|
||||||
|
|
|
@ -82,7 +82,7 @@ static int32_t mgmtDbActionDelete(SSdbOper *pOper) {
|
||||||
mgmtDropDbFromAcct(pAcct, pDb);
|
mgmtDropDbFromAcct(pAcct, pDb);
|
||||||
mgmtDropAllChildTables(pDb);
|
mgmtDropAllChildTables(pDb);
|
||||||
mgmtDropAllSuperTables(pDb);
|
mgmtDropAllSuperTables(pDb);
|
||||||
mgmtDropAllDbVgroups(pDb);
|
mgmtDropAllDbVgroups(pDb, false);
|
||||||
mgmtDecAcctRef(pAcct);
|
mgmtDecAcctRef(pAcct);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -936,7 +936,9 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 1
|
||||||
|
mgmtDropAllDbVgroups(pMsg->pDb, true);
|
||||||
|
#else
|
||||||
SVgObj *pVgroup = pMsg->pDb->pHead;
|
SVgObj *pVgroup = pMsg->pDb->pHead;
|
||||||
if (pVgroup != NULL) {
|
if (pVgroup != NULL) {
|
||||||
mPrint("vgId:%d, will be dropped", pVgroup->vgId);
|
mPrint("vgId:%d, will be dropped", pVgroup->vgId);
|
||||||
|
|
|
@ -74,7 +74,9 @@ static int32_t mgmtDnodeActionInsert(SSdbOper *pOper) {
|
||||||
static int32_t mgmtDnodeActionDelete(SSdbOper *pOper) {
|
static int32_t mgmtDnodeActionDelete(SSdbOper *pOper) {
|
||||||
SDnodeObj *pDnode = pOper->pObj;
|
SDnodeObj *pDnode = pOper->pObj;
|
||||||
|
|
||||||
|
#ifndef _SYNC
|
||||||
mgmtDropAllDnodeVgroups(pDnode);
|
mgmtDropAllDnodeVgroups(pDnode);
|
||||||
|
#endif
|
||||||
mgmtDropMnodeLocal(pDnode->dnodeId);
|
mgmtDropMnodeLocal(pDnode->dnodeId);
|
||||||
balanceNotify();
|
balanceNotify();
|
||||||
|
|
||||||
|
@ -113,7 +115,7 @@ static int32_t mgmtDnodeActionRestored() {
|
||||||
int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb);
|
int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb);
|
||||||
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
|
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
|
||||||
mgmtCreateDnode(tsLocalEp);
|
mgmtCreateDnode(tsLocalEp);
|
||||||
SDnodeObj *pDnode = mgmtGetDnodeByIp(tsLocalEp);
|
SDnodeObj *pDnode = mgmtGetDnodeByEp(tsLocalEp);
|
||||||
mgmtAddMnode(pDnode->dnodeId);
|
mgmtAddMnode(pDnode->dnodeId);
|
||||||
mgmtDecDnodeRef(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
|
@ -181,7 +183,7 @@ void *mgmtGetDnode(int32_t dnodeId) {
|
||||||
return sdbGetRow(tsDnodeSdb, &dnodeId);
|
return sdbGetRow(tsDnodeSdb, &dnodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *mgmtGetDnodeByIp(char *ep) {
|
void *mgmtGetDnodeByEp(char *ep) {
|
||||||
SDnodeObj *pDnode = NULL;
|
SDnodeObj *pDnode = NULL;
|
||||||
void * pNode = NULL;
|
void * pNode = NULL;
|
||||||
|
|
||||||
|
@ -271,7 +273,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
|
||||||
|
|
||||||
SDnodeObj *pDnode = NULL;
|
SDnodeObj *pDnode = NULL;
|
||||||
if (pStatus->dnodeId == 0) {
|
if (pStatus->dnodeId == 0) {
|
||||||
pDnode = mgmtGetDnodeByIp(pStatus->dnodeEp);
|
pDnode = mgmtGetDnodeByEp(pStatus->dnodeEp);
|
||||||
if (pDnode == NULL) {
|
if (pDnode == NULL) {
|
||||||
mTrace("dnode %s not created", pStatus->dnodeEp);
|
mTrace("dnode %s not created", pStatus->dnodeEp);
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST);
|
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST);
|
||||||
|
@ -358,7 +360,7 @@ static int32_t mgmtCreateDnode(char *ep) {
|
||||||
return grantCode;
|
return grantCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDnodeObj *pDnode = mgmtGetDnodeByIp(ep);
|
SDnodeObj *pDnode = mgmtGetDnodeByEp(ep);
|
||||||
if (pDnode != NULL) {
|
if (pDnode != NULL) {
|
||||||
mgmtDecDnodeRef(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
mError("dnode:%d is alredy exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort);
|
mError("dnode:%d is alredy exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort);
|
||||||
|
@ -391,6 +393,7 @@ static int32_t mgmtCreateDnode(char *ep) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TODO drop others tables
|
||||||
int32_t mgmtDropDnode(SDnodeObj *pDnode) {
|
int32_t mgmtDropDnode(SDnodeObj *pDnode) {
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
|
@ -407,8 +410,9 @@ int32_t mgmtDropDnode(SDnodeObj *pDnode) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mgmtDropDnodeByIp(char *ep) {
|
static int32_t mgmtDropDnodeByEp(char *ep) {
|
||||||
SDnodeObj *pDnode = mgmtGetDnodeByIp(ep);
|
|
||||||
|
SDnodeObj *pDnode = mgmtGetDnodeByEp(ep);
|
||||||
if (pDnode == NULL) {
|
if (pDnode == NULL) {
|
||||||
mError("dnode:%s, is not exist", ep);
|
mError("dnode:%s, is not exist", ep);
|
||||||
return TSDB_CODE_DNODE_NOT_EXIST;
|
return TSDB_CODE_DNODE_NOT_EXIST;
|
||||||
|
@ -437,7 +441,7 @@ static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) {
|
||||||
} else {
|
} else {
|
||||||
rpcRsp.code = mgmtCreateDnode(pCreate->ep);
|
rpcRsp.code = mgmtCreateDnode(pCreate->ep);
|
||||||
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
|
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
|
||||||
SDnodeObj *pDnode = mgmtGetDnodeByIp(pCreate->ep);
|
SDnodeObj *pDnode = mgmtGetDnodeByEp(pCreate->ep);
|
||||||
mLPrint("dnode:%d, %s is created by %s", pDnode->dnodeId, pCreate->ep, pMsg->pUser->user);
|
mLPrint("dnode:%d, %s is created by %s", pDnode->dnodeId, pCreate->ep, pMsg->pUser->user);
|
||||||
mgmtDecDnodeRef(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
} else {
|
} else {
|
||||||
|
@ -456,7 +460,7 @@ static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) {
|
||||||
if (strcmp(pMsg->pUser->user, "root") != 0) {
|
if (strcmp(pMsg->pUser->user, "root") != 0) {
|
||||||
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
|
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
|
||||||
} else {
|
} else {
|
||||||
rpcRsp.code = mgmtDropDnodeByIp(pDrop->ep);
|
rpcRsp.code = mgmtDropDnodeByEp(pDrop->ep);
|
||||||
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
|
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
|
||||||
mLPrint("dnode:%s is dropped by %s", pDrop->ep, pMsg->pUser->user);
|
mLPrint("dnode:%s is dropped by %s", pDrop->ep, pMsg->pUser->user);
|
||||||
} else {
|
} else {
|
||||||
|
@ -812,7 +816,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
|
||||||
|
|
||||||
SDnodeObj *pDnode = NULL;
|
SDnodeObj *pDnode = NULL;
|
||||||
if (pShow->payloadLen > 0 ) {
|
if (pShow->payloadLen > 0 ) {
|
||||||
pDnode = mgmtGetDnodeByIp(pShow->payload);
|
pDnode = mgmtGetDnodeByEp(pShow->payload);
|
||||||
} else {
|
} else {
|
||||||
mgmtGetNextDnode(NULL, (SDnodeObj **)&pDnode);
|
mgmtGetNextDnode(NULL, (SDnodeObj **)&pDnode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -212,15 +212,16 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
|
||||||
|
|
||||||
static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
||||||
tsSdbObj.code = code;
|
tsSdbObj.code = code;
|
||||||
sdbTrace("sdb forward request confirmed, result:%s", tstrerror(code));
|
|
||||||
sem_post(&tsSdbObj.sem);
|
sem_post(&tsSdbObj.sem);
|
||||||
|
sdbTrace("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbForwardToPeer(void *pHead) {
|
static int32_t sdbForwardToPeer(SWalHead *pHead) {
|
||||||
if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS;
|
if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, NULL);
|
int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version);
|
||||||
if (code > 0) {
|
if (code > 0) {
|
||||||
|
sdbTrace("forward request is sent, version:%" PRIu64 ", result:%s", pHead->version, tstrerror(code));
|
||||||
sem_wait(&tsSdbObj.sem);
|
sem_wait(&tsSdbObj.sem);
|
||||||
return tsSdbObj.code;
|
return tsSdbObj.code;
|
||||||
}
|
}
|
||||||
|
@ -332,7 +333,7 @@ void sdbIncRef(void *handle, void *pRow) {
|
||||||
SSdbTable *pTable = handle;
|
SSdbTable *pTable = handle;
|
||||||
int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos);
|
int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos);
|
||||||
atomic_add_fetch_32(pRefCount, 1);
|
atomic_add_fetch_32(pRefCount, 1);
|
||||||
if (0 && pTable->tableId == SDB_TABLE_CTABLE) {
|
if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) {
|
||||||
sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow),
|
sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow),
|
||||||
*pRefCount);
|
*pRefCount);
|
||||||
}
|
}
|
||||||
|
@ -344,7 +345,7 @@ void sdbDecRef(void *handle, void *pRow) {
|
||||||
SSdbTable *pTable = handle;
|
SSdbTable *pTable = handle;
|
||||||
int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos);
|
int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos);
|
||||||
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
|
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
|
||||||
if (0 && pTable->tableId == SDB_TABLE_CTABLE) {
|
if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) {
|
||||||
sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow),
|
sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow),
|
||||||
*pRefCount);
|
*pRefCount);
|
||||||
}
|
}
|
||||||
|
@ -474,14 +475,18 @@ static int sdbWrite(void *param, void *data, int type) {
|
||||||
}
|
}
|
||||||
walFsync(tsSdbObj.wal);
|
walFsync(tsSdbObj.wal);
|
||||||
|
|
||||||
sdbForwardToPeer(pHead);
|
code = sdbForwardToPeer(pHead);
|
||||||
pthread_mutex_unlock(&tsSdbObj.mutex);
|
pthread_mutex_unlock(&tsSdbObj.mutex);
|
||||||
|
|
||||||
// from app, oper is created
|
// from app, oper is created
|
||||||
if (param != NULL) return code;
|
if (param != NULL) {
|
||||||
|
//sdbTrace("request from app is disposed, version:%" PRIu64 " code:%s", pHead->version, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// from wal or forward msg, should create oper
|
// from wal or forward msg, oper not created, should add into hash
|
||||||
if (tsSdbObj.sync != NULL) {
|
if (tsSdbObj.sync != NULL) {
|
||||||
|
sdbTrace("forward request is received, version:%" PRIu64 " result:%s, confirm it", pHead->version, tstrerror(code));
|
||||||
syncConfirmForward(tsSdbObj.sync, pHead->version, code);
|
syncConfirmForward(tsSdbObj.sync, pHead->version, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -149,7 +149,9 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
|
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
|
||||||
if (rpcMsg == NULL || rpcMsg->pCont == NULL) {
|
assert(rpcMsg);
|
||||||
|
|
||||||
|
if (rpcMsg->pCont == NULL) {
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
|
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -158,7 +158,11 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtDecVgroupRef(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
mTrace("vgId:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxTables, pVgroup->numOfVnodes);
|
|
||||||
|
mTrace("vgId:%d, is updated, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes);
|
||||||
|
if (pDb) {
|
||||||
|
mTrace("tables:%d", pDb->cfg.maxTables);
|
||||||
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -545,7 +549,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
|
||||||
pCfg->cfgVersion = htonl(pDb->cfgVersion);
|
pCfg->cfgVersion = htonl(pDb->cfgVersion);
|
||||||
pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
|
pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
|
||||||
pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks);
|
pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks);
|
||||||
pCfg->maxTables = htonl(pDb->cfg.maxTables);
|
pCfg->maxTables = htonl(pDb->cfg.maxTables + 1);
|
||||||
pCfg->daysPerFile = htonl(pDb->cfg.daysPerFile);
|
pCfg->daysPerFile = htonl(pDb->cfg.daysPerFile);
|
||||||
pCfg->daysToKeep = htonl(pDb->cfg.daysToKeep);
|
pCfg->daysToKeep = htonl(pDb->cfg.daysToKeep);
|
||||||
pCfg->daysToKeep1 = htonl(pDb->cfg.daysToKeep1);
|
pCfg->daysToKeep1 = htonl(pDb->cfg.daysToKeep1);
|
||||||
|
@ -769,7 +773,7 @@ void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtDropAllDbVgroups(SDbObj *pDropDb) {
|
void mgmtDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) {
|
||||||
void *pNode = NULL;
|
void *pNode = NULL;
|
||||||
void *pLastNode = NULL;
|
void *pLastNode = NULL;
|
||||||
int32_t numOfVgroups = 0;
|
int32_t numOfVgroups = 0;
|
||||||
|
@ -790,8 +794,11 @@ void mgmtDropAllDbVgroups(SDbObj *pDropDb) {
|
||||||
sdbDeleteRow(&oper);
|
sdbDeleteRow(&oper);
|
||||||
pNode = pLastNode;
|
pNode = pLastNode;
|
||||||
numOfVgroups++;
|
numOfVgroups++;
|
||||||
|
|
||||||
|
if (sendMsg) {
|
||||||
mgmtSendDropVgroupMsg(pVgroup, NULL);
|
mgmtSendDropVgroupMsg(pVgroup, NULL);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
mgmtDecVgroupRef(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
}
|
}
|
||||||
|
|
|
@ -678,7 +678,7 @@ void SQLInfoDestroy(SSqlInfo *pInfo) {
|
||||||
free(pInfo->pDCLInfo->a);
|
free(pInfo->pDCLInfo->a);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->type == TSDB_SQL_CREATE_DB) {
|
if (pInfo->pDCLInfo != NULL && pInfo->type == TSDB_SQL_CREATE_DB) {
|
||||||
tVariantListDestroy(pInfo->pDCLInfo->dbOpt.keep);
|
tVariantListDestroy(pInfo->pDCLInfo->dbOpt.keep);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -636,9 +636,13 @@ void tsBufResetPos(STSBuf* pTSBuf) {
|
||||||
|
|
||||||
STSElem tsBufGetElem(STSBuf* pTSBuf) {
|
STSElem tsBufGetElem(STSBuf* pTSBuf) {
|
||||||
STSElem elem1 = {.vnode = -1};
|
STSElem elem1 = {.vnode = -1};
|
||||||
STSCursor* pCur = &pTSBuf->cur;
|
|
||||||
|
|
||||||
if (pTSBuf == NULL || pCur->vnodeIndex < 0) {
|
if (pTSBuf == NULL) {
|
||||||
|
return elem1;
|
||||||
|
}
|
||||||
|
|
||||||
|
STSCursor* pCur = &pTSBuf->cur;
|
||||||
|
if (pCur != NULL && pCur->vnodeIndex < 0) {
|
||||||
return elem1;
|
return elem1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,10 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) {
|
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) {
|
||||||
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) {
|
if (pWindowResInfo == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (pWindowResInfo->capacity == 0) {
|
||||||
assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL);
|
assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -611,10 +611,6 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
|
||||||
if (pCfg->maxTables < TSDB_MIN_TABLES || pCfg->maxTables > TSDB_MAX_TABLES) return -1;
|
if (pCfg->maxTables < TSDB_MIN_TABLES || pCfg->maxTables > TSDB_MAX_TABLES) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Since tableId starts from 1, we increase maxTables by 1
|
|
||||||
// TODO: take a fancier way to do this
|
|
||||||
pCfg->maxTables++;
|
|
||||||
|
|
||||||
// Check daysPerFile
|
// Check daysPerFile
|
||||||
if (pCfg->daysPerFile == -1) {
|
if (pCfg->daysPerFile == -1) {
|
||||||
pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
|
pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
|
||||||
|
|
|
@ -226,7 +226,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCheckInfo->iter == NULL) {
|
if (pCheckInfo->iter == NULL && pTable->mem) {
|
||||||
pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey,
|
pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey,
|
||||||
TSDB_DATA_TYPE_TIMESTAMP, pHandle->order);
|
TSDB_DATA_TYPE_TIMESTAMP, pHandle->order);
|
||||||
|
|
||||||
|
|
|
@ -20,10 +20,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define TAOS_QTYPE_RPC 0
|
|
||||||
#define TAOS_QTYPE_FWD 1
|
|
||||||
#define TAOS_QTYPE_WAL 2
|
|
||||||
|
|
||||||
typedef void* taos_queue;
|
typedef void* taos_queue;
|
||||||
typedef void* taos_qset;
|
typedef void* taos_qset;
|
||||||
typedef void* taos_qall;
|
typedef void* taos_qall;
|
||||||
|
|
|
@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
AUX_SOURCE_DIRECTORY(src SRC)
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
|
|
||||||
ADD_LIBRARY(vnode ${SRC})
|
ADD_LIBRARY(vnode ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(vnode tsdb)
|
TARGET_LINK_LIBRARIES(vnode tsdb tcq)
|
||||||
ENDIF ()
|
ENDIF ()
|
|
@ -30,6 +30,8 @@
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
#include "vnodeLog.h"
|
#include "vnodeLog.h"
|
||||||
|
#include "tcq.h"
|
||||||
|
//#include "tsync.h"
|
||||||
|
|
||||||
static int32_t tsOpennedVnodes;
|
static int32_t tsOpennedVnodes;
|
||||||
static void *tsDnodeVnodesHash;
|
static void *tsDnodeVnodesHash;
|
||||||
|
@ -192,8 +194,28 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
||||||
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
||||||
|
|
||||||
|
SCqCfg cqCfg;
|
||||||
|
sprintf(cqCfg.user, "root");
|
||||||
|
strcpy(cqCfg.pass, tsInternalPass);
|
||||||
|
cqCfg.cqWrite = vnodeWriteToQueue;
|
||||||
|
pVnode->cq = cqOpen(pVnode, &cqCfg);
|
||||||
|
|
||||||
|
STsdbAppH appH = {0};
|
||||||
|
appH.appH = (void *)pVnode;
|
||||||
|
appH.walCallBack = vnodeWalCallback;
|
||||||
|
appH.cqH = pVnode->cq;
|
||||||
|
|
||||||
|
sprintf(temp, "%s/tsdb", rootDir);
|
||||||
|
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
|
||||||
|
if (pVnode->tsdb == NULL) {
|
||||||
|
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
|
||||||
|
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
sprintf(temp, "%s/wal", rootDir);
|
sprintf(temp, "%s/wal", rootDir);
|
||||||
pVnode->wal = walOpen(temp, &pVnode->walCfg);
|
pVnode->wal = walOpen(temp, &pVnode->walCfg);
|
||||||
|
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
||||||
|
|
||||||
SSyncInfo syncInfo;
|
SSyncInfo syncInfo;
|
||||||
syncInfo.vgId = pVnode->vgId;
|
syncInfo.vgId = pVnode->vgId;
|
||||||
|
@ -208,24 +230,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
syncInfo.notifyRole = vnodeNotifyRole;
|
syncInfo.notifyRole = vnodeNotifyRole;
|
||||||
pVnode->sync = syncStart(&syncInfo);
|
pVnode->sync = syncStart(&syncInfo);
|
||||||
|
|
||||||
|
// start continuous query
|
||||||
|
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
|
||||||
|
cqStart(pVnode->cq);
|
||||||
|
|
||||||
pVnode->events = NULL;
|
pVnode->events = NULL;
|
||||||
pVnode->cq = NULL;
|
|
||||||
|
|
||||||
STsdbAppH appH = {0};
|
|
||||||
appH.appH = (void *)pVnode;
|
|
||||||
appH.walCallBack = vnodeWalCallback;
|
|
||||||
|
|
||||||
sprintf(temp, "%s/tsdb", rootDir);
|
|
||||||
void *pTsdb = tsdbOpenRepo(temp, &appH);
|
|
||||||
if (pTsdb == NULL) {
|
|
||||||
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
|
|
||||||
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
pVnode->tsdb = pTsdb;
|
|
||||||
|
|
||||||
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
|
||||||
|
|
||||||
pVnode->status = TAOS_VN_STATUS_READY;
|
pVnode->status = TAOS_VN_STATUS_READY;
|
||||||
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
|
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
|
||||||
|
@ -350,10 +359,16 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
|
||||||
pVnode->sync = NULL;
|
pVnode->sync = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbCloseRepo(pVnode->tsdb);
|
cqClose(pVnode->cq);
|
||||||
walClose(pVnode->wal);
|
pVnode->cq = NULL;
|
||||||
vnodeSaveVersion(pVnode);
|
|
||||||
|
|
||||||
|
tsdbCloseRepo(pVnode->tsdb);
|
||||||
|
pVnode->tsdb = NULL;
|
||||||
|
|
||||||
|
walClose(pVnode->wal);
|
||||||
|
pVnode->wal = NULL;
|
||||||
|
|
||||||
|
vnodeSaveVersion(pVnode);
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,6 +392,11 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) {
|
||||||
static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
||||||
SVnodeObj *pVnode = ahandle;
|
SVnodeObj *pVnode = ahandle;
|
||||||
pVnode->role = role;
|
pVnode->role = role;
|
||||||
|
|
||||||
|
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
|
||||||
|
cqStart(pVnode->cq);
|
||||||
|
else
|
||||||
|
cqStop(pVnode->cq);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
#include "vnodeLog.h"
|
#include "vnodeLog.h"
|
||||||
|
#include "tcq.h"
|
||||||
|
|
||||||
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
|
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
|
||||||
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
||||||
|
@ -142,7 +143,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
||||||
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
||||||
|
|
||||||
for (int i = 0; i < numOfTags; i++) {
|
for (int i = 0; i < numOfTags; i++) {
|
||||||
STColumn *pTCol = schemaColAt(pDestSchema, i);
|
STColumn *pTCol = schemaColAt(pDestTagSchema, i);
|
||||||
tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset);
|
tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset);
|
||||||
accumBytes += htons(pSchema[i + numOfColumns].bytes);
|
accumBytes += htons(pSchema[i + numOfColumns].bytes);
|
||||||
}
|
}
|
||||||
|
@ -150,7 +151,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbCreateTable(pVnode->tsdb, &tCfg);
|
code = tsdbCreateTable(pVnode->tsdb, &tCfg);
|
||||||
|
|
||||||
tfree(pDestSchema);
|
tfree(pDestSchema);
|
||||||
|
|
||||||
dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code);
|
dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code);
|
||||||
|
|
|
@ -34,12 +34,12 @@ python3 ./test.py $1 -f table/db_table.py
|
||||||
python3 ./test.py -s $1
|
python3 ./test.py -s $1
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
python3 ./test.py $1 -f import_merge/importDataLastTO.py
|
#python3 ./test.py $1 -f import_merge/importDataLastTO.py
|
||||||
python3 ./test.py -s $1
|
#python3 ./test.py -s $1
|
||||||
sleep 1
|
#sleep 1
|
||||||
python3 ./test.py $1 -f import_merge/importDataLastT.py
|
#python3 ./test.py $1 -f import_merge/importDataLastT.py
|
||||||
python3 ./test.py -s $1
|
#python3 ./test.py -s $1
|
||||||
sleep 1
|
#sleep 1
|
||||||
python3 ./test.py $1 -f import_merge/importDataTO.py
|
python3 ./test.py $1 -f import_merge/importDataTO.py
|
||||||
python3 ./test.py -s $1
|
python3 ./test.py -s $1
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
|
@ -83,11 +83,21 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
tdLog.exit('stop All dnodes')
|
tdLog.exit('stop All dnodes')
|
||||||
|
|
||||||
if masterIp == "":
|
|
||||||
tdDnodes.init(deployPath)
|
tdDnodes.init(deployPath)
|
||||||
tdDnodes.setTestCluster(testCluster)
|
tdDnodes.setTestCluster(testCluster)
|
||||||
tdDnodes.setValgrind(valgrind)
|
tdDnodes.setValgrind(valgrind)
|
||||||
|
|
||||||
|
tdDnodes.stopAll()
|
||||||
|
tdDnodes.deploy(1)
|
||||||
|
tdDnodes.start(1)
|
||||||
|
|
||||||
|
if masterIp == "":
|
||||||
|
host='127.0.0.1'
|
||||||
|
else:
|
||||||
|
host=masterIp
|
||||||
|
|
||||||
|
tdLog.notice("Procedures for tdengine deployed in %s" % (host))
|
||||||
|
|
||||||
if testCluster:
|
if testCluster:
|
||||||
tdLog.notice("Procedures for testing cluster")
|
tdLog.notice("Procedures for testing cluster")
|
||||||
if fileName == "all":
|
if fileName == "all":
|
||||||
|
@ -96,23 +106,12 @@ if __name__ == "__main__":
|
||||||
tdCases.runOneCluster(fileName)
|
tdCases.runOneCluster(fileName)
|
||||||
else:
|
else:
|
||||||
tdLog.notice("Procedures for testing self-deployment")
|
tdLog.notice("Procedures for testing self-deployment")
|
||||||
tdDnodes.stopAll()
|
|
||||||
tdDnodes.deploy(1)
|
|
||||||
tdDnodes.start(1)
|
|
||||||
conn = taos.connect(
|
conn = taos.connect(
|
||||||
host='127.0.0.1',
|
host,
|
||||||
config=tdDnodes.getSimCfgPath())
|
config=tdDnodes.getSimCfgPath())
|
||||||
if fileName == "all":
|
if fileName == "all":
|
||||||
tdCases.runAllLinux(conn)
|
tdCases.runAllLinux(conn)
|
||||||
else:
|
else:
|
||||||
tdCases.runOneLinux(conn, fileName)
|
tdCases.runOneLinux(conn, fileName)
|
||||||
conn.close()
|
|
||||||
else:
|
|
||||||
tdLog.notice("Procedures for tdengine deployed in %s" % (masterIp))
|
|
||||||
cfgPath = "../../build/test/cfg" # was: tdDnodes.getSimCfgPath()
|
|
||||||
conn = taos.connect(host=masterIp, config=cfgPath)
|
|
||||||
if fileName == "all":
|
|
||||||
tdCases.runAllWindows(conn)
|
|
||||||
else:
|
|
||||||
tdCases.runOneWindows(conn, fileName)
|
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
#run unique/big/balance.sim
|
#run unique/big/balance.sim
|
||||||
#run unique/big/maxvnodes.sim
|
#run unique/big/maxvnodes.sim
|
||||||
run unique/big/tcp.sim
|
#run unique/big/tcp.sim
|
||||||
|
|
|
@ -8,7 +8,7 @@ system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3
|
||||||
system sh/cfg.sh -n dnode3 -c numOfMPeers -v 3
|
system sh/cfg.sh -n dnode3 -c numOfMPeers -v 3
|
||||||
|
|
||||||
print ============== step1
|
print ============== step1
|
||||||
system sh/exec_up.sh -n dnode1 -s start
|
system sh/exec_up.sh -n dnode1 -s start -t
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
sql show mnodes
|
sql show mnodes
|
||||||
|
@ -26,7 +26,7 @@ if $data3_3 != null then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print ============== step2
|
print ============== step2
|
||||||
system sh/exec_up.sh -n dnode2 -s start
|
system sh/exec_up.sh -n dnode2 -s start -t
|
||||||
sql create dnode $hostname2
|
sql create dnode $hostname2
|
||||||
sleep 8000
|
sleep 8000
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ if $dnode3Role != null then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print ============== step3
|
print ============== step3
|
||||||
system sh/exec_up.sh -n dnode3 -s start
|
system sh/exec_up.sh -n dnode3 -s start -t
|
||||||
sql create dnode $hostname3
|
sql create dnode $hostname3
|
||||||
sleep 8000
|
sleep 8000
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ sleep 3000
|
||||||
|
|
||||||
system sh/deploy.sh -n dnode2 -i 2
|
system sh/deploy.sh -n dnode2 -i 2
|
||||||
system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3
|
system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3
|
||||||
system sh/exec_up.sh -n dnode2 -s start
|
system sh/exec_up.sh -n dnode2 -s start -t
|
||||||
|
|
||||||
print ============== step5
|
print ============== step5
|
||||||
sql create dnode $hostname2
|
sql create dnode $hostname2
|
||||||
|
@ -106,7 +106,7 @@ sleep 8000
|
||||||
|
|
||||||
sql show mnodes
|
sql show mnodes
|
||||||
$dnode1Role = $data2_1
|
$dnode1Role = $data2_1
|
||||||
$dnode2Role = $data3_4
|
$dnode2Role = $data2_4
|
||||||
$dnode3Role = $data2_3
|
$dnode3Role = $data2_3
|
||||||
print dnode1 ==> $dnode1Role
|
print dnode1 ==> $dnode1Role
|
||||||
print dnode2 ==> $dnode2Role
|
print dnode2 ==> $dnode2Role
|
||||||
|
@ -128,7 +128,7 @@ sleep 10000
|
||||||
|
|
||||||
sql show mnodes
|
sql show mnodes
|
||||||
$dnode1Role = $data2_1
|
$dnode1Role = $data2_1
|
||||||
$dnode2Role = $data3_4
|
$dnode2Role = $data2_4
|
||||||
$dnode3Role = $data2_3
|
$dnode3Role = $data2_3
|
||||||
print dnode1 ==> $dnode1Role
|
print dnode1 ==> $dnode1Role
|
||||||
print dnode2 ==> $dnode2Role
|
print dnode2 ==> $dnode2Role
|
||||||
|
|
|
@ -10,7 +10,7 @@ NC='\033[0m'
|
||||||
cd script
|
cd script
|
||||||
./test.sh -f basicSuite.sim 2>&1 | grep 'success\|failed\|fault' | tee out.txt
|
./test.sh -f basicSuite.sim 2>&1 | grep 'success\|failed\|fault' | tee out.txt
|
||||||
|
|
||||||
totalSuccess=`grep success out.txt | wc -l`
|
totalSuccess=`grep -w 'success' out.txt | wc -l`
|
||||||
totalBasic=`grep success out.txt | grep Suite | wc -l`
|
totalBasic=`grep success out.txt | grep Suite | wc -l`
|
||||||
|
|
||||||
if [ "$totalSuccess" -gt "0" ]; then
|
if [ "$totalSuccess" -gt "0" ]; then
|
||||||
|
@ -18,7 +18,7 @@ if [ "$totalSuccess" -gt "0" ]; then
|
||||||
echo -e "${GREEN} ### Total $totalSuccess TSIM case(s) succeed! ### ${NC}"
|
echo -e "${GREEN} ### Total $totalSuccess TSIM case(s) succeed! ### ${NC}"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
totalFailed=`grep 'failed\|fault' out.txt | wc -l`
|
totalFailed=`grep -w 'failed\|fault' out.txt | wc -l`
|
||||||
if [ "$totalFailed" -ne "0" ]; then
|
if [ "$totalFailed" -ne "0" ]; then
|
||||||
echo -e "${RED} ### Total $totalFailed TSIM case(s) failed! ### ${NC}"
|
echo -e "${RED} ### Total $totalFailed TSIM case(s) failed! ### ${NC}"
|
||||||
exit $totalFailed
|
exit $totalFailed
|
||||||
|
|
Loading…
Reference in New Issue