diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 904050fb1b..57d7234379 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -14,5 +14,6 @@ ADD_SUBDIRECTORY(mnode) ADD_SUBDIRECTORY(vnode) ADD_SUBDIRECTORY(tsdb) ADD_SUBDIRECTORY(wal) +ADD_SUBDIRECTORY(cq) ADD_SUBDIRECTORY(dnode) ADD_SUBDIRECTORY(connector/jdbc) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 1d3ec2f017..e6fd9e47ee 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -229,6 +229,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } } else if (pInfo->type == TSDB_SQL_DROP_DNODE) { + pzName->n = strdequote(pzName->z); strncpy(pTableMetaInfo->name, pzName->z, pzName->n); } else { // drop user if (pzName->n > TSDB_USER_LEN) { diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 5e91742678..d9dc9a9f41 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -140,7 +140,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd // offset of cmd in SSqlObj structure char *pSqlObjAddr = (char *)pCmd - offsetof(SSqlObj, cmd); - if (pMemBuffer == NULL || pDesc->pColumnModel == NULL) { + if (pMemBuffer == NULL) { + tscError("%p pMemBuffer", pMemBuffer); + pRes->code = TSDB_CODE_APP_ERROR; + return; + } + + if (pDesc->pColumnModel == NULL) { tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscError("%p no local buffer or intermediate result format model", pSqlObjAddr); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a6575cfb8a..f8db232afa 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -231,7 +231,11 @@ int tscSendMsgToServer(SSqlObj *pSql) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; - if (pSql == NULL || pSql->signature != pSql) { + if (pSql == NULL) { + tscError("%p sql is already released", pSql->signature); + return; + } + if (pSql->signature != pSql) { tscError("%p sql is already released, signature:%p", pSql, pSql->signature); return; } @@ -313,7 +317,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { pRes->rspType = rpcMsg->msgType; pRes->rspLen = rpcMsg->contLen; - if (pRes->rspLen > 0) { + if (pRes->rspLen > 0 && rpcMsg->pCont) { char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); if (tmp == NULL) { pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 9055dc4e01..aff4ad525b 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -220,8 +220,9 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void taos_close(TAOS *taos) { STscObj *pObj = (STscObj *)taos; - if (pObj == NULL) return; - if (pObj->signature != pObj) return; + if (pObj == NULL || pObj->signature != pObj) { + return; + } if (pObj->pHb != NULL) { tscSetFreeHeatBeat(pObj); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index d690681729..5f5af09cf8 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -172,17 +172,17 @@ static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql) { static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) { SSqlStream * pStream = (SSqlStream *)param; SSqlObj * pSql = (SSqlObj *)res; - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); if (pSql == NULL || numOfRows < 0) { int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); - tscClearTableMetaInfo(pTableMetaInfo, true); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); return; } + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); + if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful. pStream->numOfRes += numOfRows; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index fb386293c1..9a739333d4 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -710,18 +710,20 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi } void tscCloseTscObj(STscObj* pObj) { + assert(pObj != NULL); + pObj->signature = NULL; SSqlObj* pSql = pObj->pSql; + if (pSql) { terrno = pSql->res.code; + sem_destroy(&pSql->rspSem); } taosTmrStopA(&(pObj->pTimer)); tscFreeSqlObj(pSql); - sem_destroy(&pSql->rspSem); rpcClose(pObj->pMgmtConn); - pthread_mutex_destroy(&pObj->mutex); tscTrace("%p DB connection is closed", pObj); diff --git a/src/cq/CMakeLists.txt b/src/cq/CMakeLists.txt new file mode 100644 index 0000000000..e8796306f3 --- /dev/null +++ b/src/cq/CMakeLists.txt @@ -0,0 +1,16 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) +INCLUDE_DIRECTORIES(inc) + +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) + +ADD_LIBRARY(tcq ${SRC}) +TARGET_LINK_LIBRARIES(tcq tutil common taos) + +ADD_SUBDIRECTORY(test) + diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c new file mode 100644 index 0000000000..62b9a41494 --- /dev/null +++ b/src/cq/src/cqMain.c @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE + +#include +#include +#include +#include "taosdef.h" +#include "taosmsg.h" +#include "tglobal.h" +#include "tlog.h" +#include "twal.h" +#include "tcq.h" +#include "taos.h" + +#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);} +#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);} +#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);} +#define cPrint(...) {taosPrintLog("WAL ", 255, __VA_ARGS__);} + +typedef struct { + int vgId; + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + FCqWrite cqWrite; + void *ahandle; + int num; // number of continuous streams + struct SCqObj *pHead; + void *dbConn; + pthread_mutex_t mutex; +} SCqContext; + +typedef struct SCqObj { + int tid; // table ID + int rowSize; // bytes of a row + char *sqlStr; // SQL string + int columns; // number of columns + SSchema *pSchema; // pointer to schema array + void *pStream; + struct SCqObj *prev; + struct SCqObj *next; + SCqContext *pContext; +} SCqObj; + +int cqDebugFlag = 135; + +static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); + +void *cqOpen(void *ahandle, const SCqCfg *pCfg) { + + SCqContext *pContext = calloc(sizeof(SCqContext), 1); + if (pContext == NULL) return NULL; + + strcpy(pContext->user, pCfg->user); + strcpy(pContext->pass, pCfg->pass); + pContext->vgId = pCfg->vgId; + pContext->cqWrite = pCfg->cqWrite; + pContext->ahandle = ahandle; + + pthread_mutex_init(&pContext->mutex, NULL); + + cTrace("vgId:%d, CQ is opened", pContext->vgId); + + return pContext; +} + +void cqClose(void *handle) { + SCqContext *pContext = handle; + + // stop all CQs + cqStop(pContext); + + // free all resources + SCqObj *pObj = pContext->pHead; + while (pObj) { + SCqObj *pTemp = pObj; + pObj = pObj->next; + free(pTemp); + } + + pthread_mutex_destroy(&pContext->mutex); + + cTrace("vgId:%d, CQ is closed", pContext->vgId); + free(pContext); +} + +void cqStart(void *handle) { + SCqContext *pContext = handle; + cTrace("vgId:%d, start all CQs", pContext->vgId); + if (pContext->dbConn) return; + + pthread_mutex_lock(&pContext->mutex); + + tscEmbedded = 1; + pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); + if (pContext->dbConn == NULL) { + cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); + pthread_mutex_unlock(&pContext->mutex); + return; + } + + SCqObj *pObj = pContext->pHead; + while (pObj) { + int64_t lastKey = 0; + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); + } else { + cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); + } + pObj = pObj->next; + } + + pthread_mutex_unlock(&pContext->mutex); +} + +void cqStop(void *handle) { + SCqContext *pContext = handle; + cTrace("vgId:%d, stop all CQs", pContext->vgId); + if (pContext->dbConn == NULL) return; + + pthread_mutex_lock(&pContext->mutex); + + SCqObj *pObj = pContext->pHead; + while (pObj) { + if (pObj->pStream) { + taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr); + } + + pObj = pObj->next; + } + + if (pContext->dbConn) taos_close(pContext->dbConn); + pContext->dbConn = NULL; + + pthread_mutex_unlock(&pContext->mutex); +} + +void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int columns) { + SCqContext *pContext = handle; + + SCqObj *pObj = calloc(sizeof(SCqObj), 1); + if (pObj == NULL) return NULL; + + pObj->tid = tid; + pObj->sqlStr = malloc(strlen(sqlStr)+1); + strcpy(pObj->sqlStr, sqlStr); + + pObj->columns = columns; + + int size = sizeof(SSchema) * columns; + pObj->pSchema = malloc(size); + memcpy(pObj->pSchema, pSchema, size); + + cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); + + pthread_mutex_lock(&pContext->mutex); + + pObj->next = pContext->pHead; + if (pContext->pHead) pContext->pHead->prev = pObj; + pContext->pHead = pObj; + + if (pContext->dbConn) { + int64_t lastKey = 0; + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); + } else { + cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr); + } + } + + pthread_mutex_unlock(&pContext->mutex); + + return pObj; +} + +void cqDrop(void *handle) { + SCqObj *pObj = handle; + SCqContext *pContext = pObj->pContext; + + pthread_mutex_lock(&pContext->mutex); + + if (pObj->prev) { + pObj->prev->next = pObj->next; + } else { + pContext->pHead = pObj->next; + } + + if (pObj->next) { + pObj->next->prev = pObj->prev; + } + + // free the resources associated + if (pObj->pStream) taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + + cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); + free(pObj); + + pthread_mutex_lock(&pContext->mutex); +} + +static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { + SCqObj *pObj = (SCqObj *)param; + SCqContext *pContext = pObj->pContext; + if (pObj->pStream == NULL) return; + + cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); + + // construct data + int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize; + char *buffer = calloc(size, 1); + + SWalHead *pHead = (SWalHead *)buffer; + pHead->msgType = TSDB_MSG_TYPE_SUBMIT; + pHead->len = size - sizeof(SWalHead); + + SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead)); + // to do: fill in the SSubmitMsg structure + pSubmit->numOfBlocks = 1; + + + SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); + // to do: fill in the SSubmitBlk strucuture + pBlk->tid = pObj->tid; + + + // write into vnode write queue + pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); +} + diff --git a/src/cq/test/CMakeLists.txt b/src/cq/test/CMakeLists.txt new file mode 100644 index 0000000000..99c729dff4 --- /dev/null +++ b/src/cq/test/CMakeLists.txt @@ -0,0 +1,17 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) + INCLUDE_DIRECTORIES(../inc) + + LIST(APPEND CQTEST_SRC ./cqtest.c) + ADD_EXECUTABLE(cqtest ${CQTEST_SRC}) + TARGET_LINK_LIBRARIES(cqtest tcq) + +ENDIF () + + diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c new file mode 100644 index 0000000000..f620f44382 --- /dev/null +++ b/src/cq/test/cqtest.c @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include "os.h" +#include "taosdef.h" +#include "taosmsg.h" +#include "tglobal.h" +#include "tlog.h" +#include "tcq.h" + +int64_t ver = 0; +void *pCq = NULL; + +int writeToQueue(void *pVnode, void *data, int type) { + return 0; +} + +int main(int argc, char *argv[]) { + int num = 3; + + for (int i=1; i + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _TD_CQ_H_ +#define _TD_CQ_H_ + +#ifdef __cplusplus +extern "C" { +#endif + + +typedef int (*FCqWrite)(void *ahandle, void *pHead, int type); + +typedef struct { + int vgId; + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + FCqWrite cqWrite; +} SCqCfg; + +// the following API shall be called by vnode +void *cqOpen(void *ahandle, const SCqCfg *pCfg); +void cqClose(void *handle); + +// if vnode is master, vnode call this API to start CQ +void cqStart(void *handle); + +// if vnode is slave/unsynced, vnode shall call this API to stop CQ +void cqStop(void *handle); + +// cqCreate is called by TSDB to start an instance of CQ +void *cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns); + +// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate +void cqDrop(void *handle); + +extern int cqDebugFlag; + + +#ifdef __cplusplus +} +#endif + +#endif // _TD_CQ_H_ diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 5b117fbc92..f4242fbe1f 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -38,9 +38,9 @@ extern "C" { typedef struct { // WAL handle void *appH; + void *cqH; int (*walCallBack)(void *); int (*eventCallBack)(void *); - int (*cqueryCallBack)(void *); } STsdbAppH; // --------- TSDB REPOSITORY CONFIGURATION DEFINITION diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c index 256b251075..e5c50bb74e 100644 --- a/src/kit/shell/src/shellImport.c +++ b/src/kit/shell/src/shellImport.c @@ -142,6 +142,7 @@ static void shellSourceFile(TAOS *con, char *fptr) { if (wordexp(fptr, &full_path, 0) != 0) { fprintf(stderr, "ERROR: illegal file name\n"); + free(cmd); return; } diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index d3453cda36..22ffa78c81 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -62,7 +62,13 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { if (arg) arguments->password = arg; break; case 'P': - tsMnodeShellPort = atoi(arg); + if (arg) { + tsMnodeShellPort = atoi(arg); + } else { + fprintf(stderr, "Invalid port\n"); + return -1; + } + break; case 't': arguments->timezone = arg; @@ -101,7 +107,12 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { wordfree(&full_path); break; case 'T': - arguments->threadNum = atoi(arg); + if (arg) { + arguments->threadNum = atoi(arg); + } else { + fprintf(stderr, "Invalid number of threads\n"); + return -1; + } break; case 'd': arguments->database = arg; diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 24855ab8b5..937c8d177d 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -340,6 +340,9 @@ int main(int argc, char *argv[]) { int count_data_type = 0; char dataString[512]; bool do_aggreFunc = true; + + memset(dataString, 0, 512); + if (strcasecmp(data_type[0], "BINARY") == 0 || strcasecmp(data_type[0], "BOOL") == 0) { do_aggreFunc = false; } diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index 2e64c9bccc..ed98a9b92c 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -383,14 +383,13 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) { TAOS_FIELD *fields = taos_fetch_fields(result); - while ((row = taos_fetch_row(result)) != NULL) { + if ((row = taos_fetch_row(result)) != NULL) { isSet = true; pTableRecordInfo->isMetric = false; strncpy(pTableRecordInfo->tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes); strncpy(pTableRecordInfo->tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes); - break; } taos_free_result(result); @@ -410,11 +409,10 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) { return -1; } - while ((row = taos_fetch_row(result)) != NULL) { + if ((row = taos_fetch_row(result)) != NULL) { isSet = true; pTableRecordInfo->isMetric = true; strcpy(pTableRecordInfo->tableRecord.metric, table); - break; } taos_free_result(result); diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 1b5199e727..1d7116c6c0 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -38,7 +38,7 @@ void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode); void mgmtIncDnodeRef(SDnodeObj *pDnode); void mgmtDecDnodeRef(SDnodeObj *pDnode); void * mgmtGetDnode(int32_t dnodeId); -void * mgmtGetDnodeByIp(char *ep); +void * mgmtGetDnodeByEp(char *ep); void mgmtUpdateDnode(SDnodeObj *pDnode); int32_t mgmtDropDnode(SDnodeObj *pDnode); diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 3f8dc35a00..21a2c9b896 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -32,7 +32,7 @@ void mgmtCleanUpVgroups(); SVgObj *mgmtGetVgroup(int32_t vgId); void mgmtIncVgroupRef(SVgObj *pVgroup); void mgmtDecVgroupRef(SVgObj *pVgroup); -void mgmtDropAllDbVgroups(SDbObj *pDropDb); +void mgmtDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg); void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode); void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 311842083c..6a9e4189d0 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -82,7 +82,7 @@ static int32_t mgmtDbActionDelete(SSdbOper *pOper) { mgmtDropDbFromAcct(pAcct, pDb); mgmtDropAllChildTables(pDb); mgmtDropAllSuperTables(pDb); - mgmtDropAllDbVgroups(pDb); + mgmtDropAllDbVgroups(pDb, false); mgmtDecAcctRef(pAcct); return TSDB_CODE_SUCCESS; @@ -936,7 +936,9 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { return; } -#if 0 +#if 1 + mgmtDropAllDbVgroups(pMsg->pDb, true); +#else SVgObj *pVgroup = pMsg->pDb->pHead; if (pVgroup != NULL) { mPrint("vgId:%d, will be dropped", pVgroup->vgId); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index c7643b9bf9..6629737787 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -74,7 +74,9 @@ static int32_t mgmtDnodeActionInsert(SSdbOper *pOper) { static int32_t mgmtDnodeActionDelete(SSdbOper *pOper) { SDnodeObj *pDnode = pOper->pObj; +#ifndef _SYNC mgmtDropAllDnodeVgroups(pDnode); +#endif mgmtDropMnodeLocal(pDnode->dnodeId); balanceNotify(); @@ -113,7 +115,7 @@ static int32_t mgmtDnodeActionRestored() { int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); if (numOfRows <= 0 && dnodeIsFirstDeploy()) { mgmtCreateDnode(tsLocalEp); - SDnodeObj *pDnode = mgmtGetDnodeByIp(tsLocalEp); + SDnodeObj *pDnode = mgmtGetDnodeByEp(tsLocalEp); mgmtAddMnode(pDnode->dnodeId); mgmtDecDnodeRef(pDnode); } @@ -181,7 +183,7 @@ void *mgmtGetDnode(int32_t dnodeId) { return sdbGetRow(tsDnodeSdb, &dnodeId); } -void *mgmtGetDnodeByIp(char *ep) { +void *mgmtGetDnodeByEp(char *ep) { SDnodeObj *pDnode = NULL; void * pNode = NULL; @@ -271,7 +273,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { SDnodeObj *pDnode = NULL; if (pStatus->dnodeId == 0) { - pDnode = mgmtGetDnodeByIp(pStatus->dnodeEp); + pDnode = mgmtGetDnodeByEp(pStatus->dnodeEp); if (pDnode == NULL) { mTrace("dnode %s not created", pStatus->dnodeEp); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); @@ -358,7 +360,7 @@ static int32_t mgmtCreateDnode(char *ep) { return grantCode; } - SDnodeObj *pDnode = mgmtGetDnodeByIp(ep); + SDnodeObj *pDnode = mgmtGetDnodeByEp(ep); if (pDnode != NULL) { mgmtDecDnodeRef(pDnode); mError("dnode:%d is alredy exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort); @@ -391,6 +393,7 @@ static int32_t mgmtCreateDnode(char *ep) { return code; } +//TODO drop others tables int32_t mgmtDropDnode(SDnodeObj *pDnode) { SSdbOper oper = { .type = SDB_OPER_GLOBAL, @@ -407,8 +410,9 @@ int32_t mgmtDropDnode(SDnodeObj *pDnode) { return code; } -static int32_t mgmtDropDnodeByIp(char *ep) { - SDnodeObj *pDnode = mgmtGetDnodeByIp(ep); +static int32_t mgmtDropDnodeByEp(char *ep) { + + SDnodeObj *pDnode = mgmtGetDnodeByEp(ep); if (pDnode == NULL) { mError("dnode:%s, is not exist", ep); return TSDB_CODE_DNODE_NOT_EXIST; @@ -437,7 +441,7 @@ static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) { } else { rpcRsp.code = mgmtCreateDnode(pCreate->ep); if (rpcRsp.code == TSDB_CODE_SUCCESS) { - SDnodeObj *pDnode = mgmtGetDnodeByIp(pCreate->ep); + SDnodeObj *pDnode = mgmtGetDnodeByEp(pCreate->ep); mLPrint("dnode:%d, %s is created by %s", pDnode->dnodeId, pCreate->ep, pMsg->pUser->user); mgmtDecDnodeRef(pDnode); } else { @@ -456,7 +460,7 @@ static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) { if (strcmp(pMsg->pUser->user, "root") != 0) { rpcRsp.code = TSDB_CODE_NO_RIGHTS; } else { - rpcRsp.code = mgmtDropDnodeByIp(pDrop->ep); + rpcRsp.code = mgmtDropDnodeByEp(pDrop->ep); if (rpcRsp.code == TSDB_CODE_SUCCESS) { mLPrint("dnode:%s is dropped by %s", pDrop->ep, pMsg->pUser->user); } else { @@ -812,7 +816,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo SDnodeObj *pDnode = NULL; if (pShow->payloadLen > 0 ) { - pDnode = mgmtGetDnodeByIp(pShow->payload); + pDnode = mgmtGetDnodeByEp(pShow->payload); } else { mgmtGetNextDnode(NULL, (SDnodeObj **)&pDnode); } diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 53b9d2b814..839695a6e5 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -212,15 +212,16 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { tsSdbObj.code = code; - sdbTrace("sdb forward request confirmed, result:%s", tstrerror(code)); sem_post(&tsSdbObj.sem); + sdbTrace("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code)); } -static int32_t sdbForwardToPeer(void *pHead) { +static int32_t sdbForwardToPeer(SWalHead *pHead) { if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; - int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, NULL); + int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version); if (code > 0) { + sdbTrace("forward request is sent, version:%" PRIu64 ", result:%s", pHead->version, tstrerror(code)); sem_wait(&tsSdbObj.sem); return tsSdbObj.code; } @@ -332,7 +333,7 @@ void sdbIncRef(void *handle, void *pRow) { SSdbTable *pTable = handle; int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); atomic_add_fetch_32(pRefCount, 1); - if (0 && pTable->tableId == SDB_TABLE_CTABLE) { + if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) { sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); } @@ -344,7 +345,7 @@ void sdbDecRef(void *handle, void *pRow) { SSdbTable *pTable = handle; int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); - if (0 && pTable->tableId == SDB_TABLE_CTABLE) { + if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) { sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); } @@ -474,14 +475,18 @@ static int sdbWrite(void *param, void *data, int type) { } walFsync(tsSdbObj.wal); - sdbForwardToPeer(pHead); + code = sdbForwardToPeer(pHead); pthread_mutex_unlock(&tsSdbObj.mutex); // from app, oper is created - if (param != NULL) return code; - - // from wal or forward msg, should create oper + if (param != NULL) { + //sdbTrace("request from app is disposed, version:%" PRIu64 " code:%s", pHead->version, tstrerror(code)); + return code; + } + + // from wal or forward msg, oper not created, should add into hash if (tsSdbObj.sync != NULL) { + sdbTrace("forward request is received, version:%" PRIu64 " result:%s, confirm it", pHead->version, tstrerror(code)); syncConfirmForward(tsSdbObj.sync, pHead->version, code); } diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 08ea74c9f6..c86bf2a2dd 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -149,7 +149,9 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { } static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { - if (rpcMsg == NULL || rpcMsg->pCont == NULL) { + assert(rpcMsg); + + if (rpcMsg->pCont == NULL) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); return; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 839dce5c38..d8007d000d 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -158,7 +158,11 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) { } mgmtDecVgroupRef(pVgroup); - mTrace("vgId:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxTables, pVgroup->numOfVnodes); + + mTrace("vgId:%d, is updated, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes); + if (pDb) { + mTrace("tables:%d", pDb->cfg.maxTables); + } return TSDB_CODE_SUCCESS; } @@ -545,7 +549,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { pCfg->cfgVersion = htonl(pDb->cfgVersion); pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize); pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks); - pCfg->maxTables = htonl(pDb->cfg.maxTables); + pCfg->maxTables = htonl(pDb->cfg.maxTables + 1); pCfg->daysPerFile = htonl(pDb->cfg.daysPerFile); pCfg->daysToKeep = htonl(pDb->cfg.daysToKeep); pCfg->daysToKeep1 = htonl(pDb->cfg.daysToKeep1); @@ -769,7 +773,7 @@ void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode) { } } -void mgmtDropAllDbVgroups(SDbObj *pDropDb) { +void mgmtDropAllDbVgroups(SDbObj *pDropDb, bool sendMsg) { void *pNode = NULL; void *pLastNode = NULL; int32_t numOfVgroups = 0; @@ -790,7 +794,10 @@ void mgmtDropAllDbVgroups(SDbObj *pDropDb) { sdbDeleteRow(&oper); pNode = pLastNode; numOfVgroups++; - mgmtSendDropVgroupMsg(pVgroup, NULL); + + if (sendMsg) { + mgmtSendDropVgroupMsg(pVgroup, NULL); + } } mgmtDecVgroupRef(pVgroup); diff --git a/src/query/src/qparserImpl.c b/src/query/src/qparserImpl.c index 80e22d4514..d8e48de918 100644 --- a/src/query/src/qparserImpl.c +++ b/src/query/src/qparserImpl.c @@ -678,7 +678,7 @@ void SQLInfoDestroy(SSqlInfo *pInfo) { free(pInfo->pDCLInfo->a); } - if (pInfo->type == TSDB_SQL_CREATE_DB) { + if (pInfo->pDCLInfo != NULL && pInfo->type == TSDB_SQL_CREATE_DB) { tVariantListDestroy(pInfo->pDCLInfo->dbOpt.keep); } @@ -902,4 +902,4 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) { pDBInfo->keep = NULL; memset(&pDBInfo->precision, 0, sizeof(SSQLToken)); -} \ No newline at end of file +} diff --git a/src/query/src/qtsbuf.c b/src/query/src/qtsbuf.c index 062a8038b2..1d5c4f2d9d 100644 --- a/src/query/src/qtsbuf.c +++ b/src/query/src/qtsbuf.c @@ -636,12 +636,16 @@ void tsBufResetPos(STSBuf* pTSBuf) { STSElem tsBufGetElem(STSBuf* pTSBuf) { STSElem elem1 = {.vnode = -1}; - STSCursor* pCur = &pTSBuf->cur; - if (pTSBuf == NULL || pCur->vnodeIndex < 0) { + if (pTSBuf == NULL) { return elem1; } + STSCursor* pCur = &pTSBuf->cur; + if (pCur != NULL && pCur->vnodeIndex < 0) { + return elem1; + } + STSBlock* pBlock = &pTSBuf->block; elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode; @@ -920,4 +924,4 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { pTSBuf->fileSize += getDataStartOffset(); return pTSBuf; -} \ No newline at end of file +} diff --git a/src/query/src/queryUtil.c b/src/query/src/queryUtil.c index b4d8911284..9da02f9f0f 100644 --- a/src/query/src/queryUtil.c +++ b/src/query/src/queryUtil.c @@ -62,7 +62,10 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) { } void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) { - if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { + if (pWindowResInfo == NULL) { + return; + } + if (pWindowResInfo->capacity == 0) { assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL); return; } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c0f6030fa6..fcfbcc9014 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -611,10 +611,6 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { if (pCfg->maxTables < TSDB_MIN_TABLES || pCfg->maxTables > TSDB_MAX_TABLES) return -1; } - // Since tableId starts from 1, we increase maxTables by 1 - // TODO: take a fancier way to do this - pCfg->maxTables++; - // Check daysPerFile if (pCfg->daysPerFile == -1) { pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index ff521a4194..14cb052196 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -226,7 +226,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { return false; } - if (pCheckInfo->iter == NULL) { + if (pCheckInfo->iter == NULL && pTable->mem) { pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, pHandle->order); diff --git a/src/util/inc/tqueue.h b/src/util/inc/tqueue.h index c45eb10518..f4086dcd12 100644 --- a/src/util/inc/tqueue.h +++ b/src/util/inc/tqueue.h @@ -20,10 +20,6 @@ extern "C" { #endif -#define TAOS_QTYPE_RPC 0 -#define TAOS_QTYPE_FWD 1 -#define TAOS_QTYPE_WAL 2 - typedef void* taos_queue; typedef void* taos_qset; typedef void* taos_qall; diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 6ceb83cb45..a1c56b32b5 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(vnode ${SRC}) - TARGET_LINK_LIBRARIES(vnode tsdb) -ENDIF () \ No newline at end of file + TARGET_LINK_LIBRARIES(vnode tsdb tcq) +ENDIF () diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 1302ceaff4..03e508c5bf 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -30,6 +30,8 @@ #include "vnode.h" #include "vnodeInt.h" #include "vnodeLog.h" +#include "tcq.h" +//#include "tsync.h" static int32_t tsOpennedVnodes; static void *tsDnodeVnodesHash; @@ -192,8 +194,28 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); + SCqCfg cqCfg; + sprintf(cqCfg.user, "root"); + strcpy(cqCfg.pass, tsInternalPass); + cqCfg.cqWrite = vnodeWriteToQueue; + pVnode->cq = cqOpen(pVnode, &cqCfg); + + STsdbAppH appH = {0}; + appH.appH = (void *)pVnode; + appH.walCallBack = vnodeWalCallback; + appH.cqH = pVnode->cq; + + sprintf(temp, "%s/tsdb", rootDir); + pVnode->tsdb = tsdbOpenRepo(temp, &appH); + if (pVnode->tsdb == NULL) { + dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + return terrno; + } + sprintf(temp, "%s/wal", rootDir); pVnode->wal = walOpen(temp, &pVnode->walCfg); + walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); SSyncInfo syncInfo; syncInfo.vgId = pVnode->vgId; @@ -208,24 +230,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.notifyRole = vnodeNotifyRole; pVnode->sync = syncStart(&syncInfo); + // start continuous query + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + cqStart(pVnode->cq); + pVnode->events = NULL; - pVnode->cq = NULL; - - STsdbAppH appH = {0}; - appH.appH = (void *)pVnode; - appH.walCallBack = vnodeWalCallback; - - sprintf(temp, "%s/tsdb", rootDir); - void *pTsdb = tsdbOpenRepo(temp, &appH); - if (pTsdb == NULL) { - dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - return terrno; - } - - pVnode->tsdb = pTsdb; - - walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); pVnode->status = TAOS_VN_STATUS_READY; dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); @@ -350,10 +359,16 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { pVnode->sync = NULL; } - tsdbCloseRepo(pVnode->tsdb); - walClose(pVnode->wal); - vnodeSaveVersion(pVnode); + cqClose(pVnode->cq); + pVnode->cq = NULL; + tsdbCloseRepo(pVnode->tsdb); + pVnode->tsdb = NULL; + + walClose(pVnode->wal); + pVnode->wal = NULL; + + vnodeSaveVersion(pVnode); vnodeRelease(pVnode); } @@ -377,6 +392,11 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) { static void vnodeNotifyRole(void *ahandle, int8_t role) { SVnodeObj *pVnode = ahandle; pVnode->role = role; + + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + cqStart(pVnode->cq); + else + cqStop(pVnode->cq); } static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index e5a5be4ed5..84609b8844 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -26,6 +26,7 @@ #include "vnode.h" #include "vnodeInt.h" #include "vnodeLog.h" +#include "tcq.h" static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); @@ -142,7 +143,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); for (int i = 0; i < numOfTags; i++) { - STColumn *pTCol = schemaColAt(pDestSchema, i); + STColumn *pTCol = schemaColAt(pDestTagSchema, i); tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset); accumBytes += htons(pSchema[i + numOfColumns].bytes); } @@ -150,7 +151,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe } code = tsdbCreateTable(pVnode->tsdb, &tCfg); - tfree(pDestSchema); dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code); diff --git a/tests/pytest/smoketest.sh b/tests/pytest/smoketest.sh index af597fb6c5..7dbefa9402 100755 --- a/tests/pytest/smoketest.sh +++ b/tests/pytest/smoketest.sh @@ -34,12 +34,12 @@ python3 ./test.py $1 -f table/db_table.py python3 ./test.py -s $1 sleep 1 -python3 ./test.py $1 -f import_merge/importDataLastTO.py -python3 ./test.py -s $1 -sleep 1 -python3 ./test.py $1 -f import_merge/importDataLastT.py -python3 ./test.py -s $1 -sleep 1 +#python3 ./test.py $1 -f import_merge/importDataLastTO.py +#python3 ./test.py -s $1 +#sleep 1 +#python3 ./test.py $1 -f import_merge/importDataLastT.py +#python3 ./test.py -s $1 +#sleep 1 python3 ./test.py $1 -f import_merge/importDataTO.py python3 ./test.py -s $1 sleep 1 diff --git a/tests/pytest/test.py b/tests/pytest/test.py index 479406a00b..9bf1660634 100644 --- a/tests/pytest/test.py +++ b/tests/pytest/test.py @@ -83,36 +83,35 @@ if __name__ == "__main__": tdLog.exit('stop All dnodes') - if masterIp == "": - tdDnodes.init(deployPath) - tdDnodes.setTestCluster(testCluster) - tdDnodes.setValgrind(valgrind) + tdDnodes.init(deployPath) + tdDnodes.setTestCluster(testCluster) + tdDnodes.setValgrind(valgrind) - if testCluster: - tdLog.notice("Procedures for testing cluster") - if fileName == "all": - tdCases.runAllCluster() - else: - tdCases.runOneCluster(fileName) - else: - tdLog.notice("Procedures for testing self-deployment") - tdDnodes.stopAll() - tdDnodes.deploy(1) - tdDnodes.start(1) - conn = taos.connect( - host='127.0.0.1', - config=tdDnodes.getSimCfgPath()) - if fileName == "all": - tdCases.runAllLinux(conn) - else: - tdCases.runOneLinux(conn, fileName) - conn.close() + tdDnodes.stopAll() + tdDnodes.deploy(1) + tdDnodes.start(1) + + if masterIp == "": + host='127.0.0.1' else: - tdLog.notice("Procedures for tdengine deployed in %s" % (masterIp)) - cfgPath = "../../build/test/cfg" # was: tdDnodes.getSimCfgPath() - conn = taos.connect(host=masterIp, config=cfgPath) + host=masterIp + + tdLog.notice("Procedures for tdengine deployed in %s" % (host)) + + if testCluster: + tdLog.notice("Procedures for testing cluster") if fileName == "all": - tdCases.runAllWindows(conn) + tdCases.runAllCluster() else: - tdCases.runOneWindows(conn, fileName) - conn.close() + tdCases.runOneCluster(fileName) + else: + tdLog.notice("Procedures for testing self-deployment") + conn = taos.connect( + host, + config=tdDnodes.getSimCfgPath()) + if fileName == "all": + tdCases.runAllLinux(conn) + else: + tdCases.runOneLinux(conn, fileName) + + conn.close() diff --git a/tests/script/unique/big/testSuite.sim b/tests/script/unique/big/testSuite.sim index 05a003ac2f..5881d1fb67 100644 --- a/tests/script/unique/big/testSuite.sim +++ b/tests/script/unique/big/testSuite.sim @@ -1,3 +1,3 @@ #run unique/big/balance.sim #run unique/big/maxvnodes.sim -run unique/big/tcp.sim +#run unique/big/tcp.sim diff --git a/tests/script/unique/mnode/mgmt33.sim b/tests/script/unique/mnode/mgmt33.sim index df0a289ab7..30fae2c243 100644 --- a/tests/script/unique/mnode/mgmt33.sim +++ b/tests/script/unique/mnode/mgmt33.sim @@ -8,7 +8,7 @@ system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3 system sh/cfg.sh -n dnode3 -c numOfMPeers -v 3 print ============== step1 -system sh/exec_up.sh -n dnode1 -s start +system sh/exec_up.sh -n dnode1 -s start -t sql connect sql show mnodes @@ -26,7 +26,7 @@ if $data3_3 != null then endi print ============== step2 -system sh/exec_up.sh -n dnode2 -s start +system sh/exec_up.sh -n dnode2 -s start -t sql create dnode $hostname2 sleep 8000 @@ -49,7 +49,7 @@ if $dnode3Role != null then endi print ============== step3 -system sh/exec_up.sh -n dnode3 -s start +system sh/exec_up.sh -n dnode3 -s start -t sql create dnode $hostname3 sleep 8000 @@ -98,7 +98,7 @@ sleep 3000 system sh/deploy.sh -n dnode2 -i 2 system sh/cfg.sh -n dnode2 -c numOfMPeers -v 3 -system sh/exec_up.sh -n dnode2 -s start +system sh/exec_up.sh -n dnode2 -s start -t print ============== step5 sql create dnode $hostname2 @@ -106,7 +106,7 @@ sleep 8000 sql show mnodes $dnode1Role = $data2_1 -$dnode2Role = $data3_4 +$dnode2Role = $data2_4 $dnode3Role = $data2_3 print dnode1 ==> $dnode1Role print dnode2 ==> $dnode2Role @@ -128,7 +128,7 @@ sleep 10000 sql show mnodes $dnode1Role = $data2_1 -$dnode2Role = $data3_4 +$dnode2Role = $data2_4 $dnode3Role = $data2_3 print dnode1 ==> $dnode1Role print dnode2 ==> $dnode2Role diff --git a/tests/test-all.sh b/tests/test-all.sh index ee1904ba7c..4bffca1201 100755 --- a/tests/test-all.sh +++ b/tests/test-all.sh @@ -10,7 +10,7 @@ NC='\033[0m' cd script ./test.sh -f basicSuite.sim 2>&1 | grep 'success\|failed\|fault' | tee out.txt -totalSuccess=`grep success out.txt | wc -l` +totalSuccess=`grep -w 'success' out.txt | wc -l` totalBasic=`grep success out.txt | grep Suite | wc -l` if [ "$totalSuccess" -gt "0" ]; then @@ -18,7 +18,7 @@ if [ "$totalSuccess" -gt "0" ]; then echo -e "${GREEN} ### Total $totalSuccess TSIM case(s) succeed! ### ${NC}" fi -totalFailed=`grep 'failed\|fault' out.txt | wc -l` +totalFailed=`grep -w 'failed\|fault' out.txt | wc -l` if [ "$totalFailed" -ne "0" ]; then echo -e "${RED} ### Total $totalFailed TSIM case(s) failed! ### ${NC}" exit $totalFailed