diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 904050fb1b..57d7234379 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -14,5 +14,6 @@ ADD_SUBDIRECTORY(mnode) ADD_SUBDIRECTORY(vnode) ADD_SUBDIRECTORY(tsdb) ADD_SUBDIRECTORY(wal) +ADD_SUBDIRECTORY(cq) ADD_SUBDIRECTORY(dnode) ADD_SUBDIRECTORY(connector/jdbc) diff --git a/src/cq/CMakeLists.txt b/src/cq/CMakeLists.txt new file mode 100644 index 0000000000..e8796306f3 --- /dev/null +++ b/src/cq/CMakeLists.txt @@ -0,0 +1,16 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) +INCLUDE_DIRECTORIES(inc) + +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) + +ADD_LIBRARY(tcq ${SRC}) +TARGET_LINK_LIBRARIES(tcq tutil common taos) + +ADD_SUBDIRECTORY(test) + diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c new file mode 100644 index 0000000000..62b9a41494 --- /dev/null +++ b/src/cq/src/cqMain.c @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE + +#include +#include +#include +#include "taosdef.h" +#include "taosmsg.h" +#include "tglobal.h" +#include "tlog.h" +#include "twal.h" +#include "tcq.h" +#include "taos.h" + +#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);} +#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);} +#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);} +#define cPrint(...) {taosPrintLog("WAL ", 255, __VA_ARGS__);} + +typedef struct { + int vgId; + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + FCqWrite cqWrite; + void *ahandle; + int num; // number of continuous streams + struct SCqObj *pHead; + void *dbConn; + pthread_mutex_t mutex; +} SCqContext; + +typedef struct SCqObj { + int tid; // table ID + int rowSize; // bytes of a row + char *sqlStr; // SQL string + int columns; // number of columns + SSchema *pSchema; // pointer to schema array + void *pStream; + struct SCqObj *prev; + struct SCqObj *next; + SCqContext *pContext; +} SCqObj; + +int cqDebugFlag = 135; + +static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); + +void *cqOpen(void *ahandle, const SCqCfg *pCfg) { + + SCqContext *pContext = calloc(sizeof(SCqContext), 1); + if (pContext == NULL) return NULL; + + strcpy(pContext->user, pCfg->user); + strcpy(pContext->pass, pCfg->pass); + pContext->vgId = pCfg->vgId; + pContext->cqWrite = pCfg->cqWrite; + pContext->ahandle = ahandle; + + pthread_mutex_init(&pContext->mutex, NULL); + + cTrace("vgId:%d, CQ is opened", pContext->vgId); + + return pContext; +} + +void cqClose(void *handle) { + SCqContext *pContext = handle; + + // stop all CQs + cqStop(pContext); + + // free all resources + SCqObj *pObj = pContext->pHead; + while (pObj) { + SCqObj *pTemp = pObj; + pObj = pObj->next; + free(pTemp); + } + + pthread_mutex_destroy(&pContext->mutex); + + cTrace("vgId:%d, CQ is closed", pContext->vgId); + free(pContext); +} + +void cqStart(void *handle) { + SCqContext *pContext = handle; + cTrace("vgId:%d, start all CQs", pContext->vgId); + if (pContext->dbConn) return; + + pthread_mutex_lock(&pContext->mutex); + + tscEmbedded = 1; + pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); + if (pContext->dbConn == NULL) { + cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); + pthread_mutex_unlock(&pContext->mutex); + return; + } + + SCqObj *pObj = pContext->pHead; + while (pObj) { + int64_t lastKey = 0; + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); + } else { + cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); + } + pObj = pObj->next; + } + + pthread_mutex_unlock(&pContext->mutex); +} + +void cqStop(void *handle) { + SCqContext *pContext = handle; + cTrace("vgId:%d, stop all CQs", pContext->vgId); + if (pContext->dbConn == NULL) return; + + pthread_mutex_lock(&pContext->mutex); + + SCqObj *pObj = pContext->pHead; + while (pObj) { + if (pObj->pStream) { + taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr); + } + + pObj = pObj->next; + } + + if (pContext->dbConn) taos_close(pContext->dbConn); + pContext->dbConn = NULL; + + pthread_mutex_unlock(&pContext->mutex); +} + +void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int columns) { + SCqContext *pContext = handle; + + SCqObj *pObj = calloc(sizeof(SCqObj), 1); + if (pObj == NULL) return NULL; + + pObj->tid = tid; + pObj->sqlStr = malloc(strlen(sqlStr)+1); + strcpy(pObj->sqlStr, sqlStr); + + pObj->columns = columns; + + int size = sizeof(SSchema) * columns; + pObj->pSchema = malloc(size); + memcpy(pObj->pSchema, pSchema, size); + + cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); + + pthread_mutex_lock(&pContext->mutex); + + pObj->next = pContext->pHead; + if (pContext->pHead) pContext->pHead->prev = pObj; + pContext->pHead = pObj; + + if (pContext->dbConn) { + int64_t lastKey = 0; + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); + } else { + cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr); + } + } + + pthread_mutex_unlock(&pContext->mutex); + + return pObj; +} + +void cqDrop(void *handle) { + SCqObj *pObj = handle; + SCqContext *pContext = pObj->pContext; + + pthread_mutex_lock(&pContext->mutex); + + if (pObj->prev) { + pObj->prev->next = pObj->next; + } else { + pContext->pHead = pObj->next; + } + + if (pObj->next) { + pObj->next->prev = pObj->prev; + } + + // free the resources associated + if (pObj->pStream) taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + + cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); + free(pObj); + + pthread_mutex_lock(&pContext->mutex); +} + +static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { + SCqObj *pObj = (SCqObj *)param; + SCqContext *pContext = pObj->pContext; + if (pObj->pStream == NULL) return; + + cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); + + // construct data + int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize; + char *buffer = calloc(size, 1); + + SWalHead *pHead = (SWalHead *)buffer; + pHead->msgType = TSDB_MSG_TYPE_SUBMIT; + pHead->len = size - sizeof(SWalHead); + + SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead)); + // to do: fill in the SSubmitMsg structure + pSubmit->numOfBlocks = 1; + + + SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); + // to do: fill in the SSubmitBlk strucuture + pBlk->tid = pObj->tid; + + + // write into vnode write queue + pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); +} + diff --git a/src/cq/test/CMakeLists.txt b/src/cq/test/CMakeLists.txt new file mode 100644 index 0000000000..99c729dff4 --- /dev/null +++ b/src/cq/test/CMakeLists.txt @@ -0,0 +1,17 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) + INCLUDE_DIRECTORIES(../inc) + + LIST(APPEND CQTEST_SRC ./cqtest.c) + ADD_EXECUTABLE(cqtest ${CQTEST_SRC}) + TARGET_LINK_LIBRARIES(cqtest tcq) + +ENDIF () + + diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c new file mode 100644 index 0000000000..f620f44382 --- /dev/null +++ b/src/cq/test/cqtest.c @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include "os.h" +#include "taosdef.h" +#include "taosmsg.h" +#include "tglobal.h" +#include "tlog.h" +#include "tcq.h" + +int64_t ver = 0; +void *pCq = NULL; + +int writeToQueue(void *pVnode, void *data, int type) { + return 0; +} + +int main(int argc, char *argv[]) { + int num = 3; + + for (int i=1; i + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _TD_CQ_H_ +#define _TD_CQ_H_ + +#ifdef __cplusplus +extern "C" { +#endif + + +typedef int (*FCqWrite)(void *ahandle, void *pHead, int type); + +typedef struct { + int vgId; + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + FCqWrite cqWrite; +} SCqCfg; + +// the following API shall be called by vnode +void *cqOpen(void *ahandle, const SCqCfg *pCfg); +void cqClose(void *handle); + +// if vnode is master, vnode call this API to start CQ +void cqStart(void *handle); + +// if vnode is slave/unsynced, vnode shall call this API to stop CQ +void cqStop(void *handle); + +// cqCreate is called by TSDB to start an instance of CQ +void *cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns); + +// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate +void cqDrop(void *handle); + +extern int cqDebugFlag; + + +#ifdef __cplusplus +} +#endif + +#endif // _TD_CQ_H_ diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index a59a278d52..35cf1b52c7 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -38,9 +38,9 @@ extern "C" { typedef struct { // WAL handle void *appH; + void *cqH; int (*walCallBack)(void *); int (*eventCallBack)(void *); - int (*cqueryCallBack)(void *); } STsdbAppH; // --------- TSDB REPOSITORY CONFIGURATION DEFINITION diff --git a/src/util/inc/tqueue.h b/src/util/inc/tqueue.h index c45eb10518..f4086dcd12 100644 --- a/src/util/inc/tqueue.h +++ b/src/util/inc/tqueue.h @@ -20,10 +20,6 @@ extern "C" { #endif -#define TAOS_QTYPE_RPC 0 -#define TAOS_QTYPE_FWD 1 -#define TAOS_QTYPE_WAL 2 - typedef void* taos_queue; typedef void* taos_qset; typedef void* taos_qall; diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 6ceb83cb45..a1c56b32b5 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(vnode ${SRC}) - TARGET_LINK_LIBRARIES(vnode tsdb) -ENDIF () \ No newline at end of file + TARGET_LINK_LIBRARIES(vnode tsdb tcq) +ENDIF () diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 1302ceaff4..03e508c5bf 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -30,6 +30,8 @@ #include "vnode.h" #include "vnodeInt.h" #include "vnodeLog.h" +#include "tcq.h" +//#include "tsync.h" static int32_t tsOpennedVnodes; static void *tsDnodeVnodesHash; @@ -192,8 +194,28 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); + SCqCfg cqCfg; + sprintf(cqCfg.user, "root"); + strcpy(cqCfg.pass, tsInternalPass); + cqCfg.cqWrite = vnodeWriteToQueue; + pVnode->cq = cqOpen(pVnode, &cqCfg); + + STsdbAppH appH = {0}; + appH.appH = (void *)pVnode; + appH.walCallBack = vnodeWalCallback; + appH.cqH = pVnode->cq; + + sprintf(temp, "%s/tsdb", rootDir); + pVnode->tsdb = tsdbOpenRepo(temp, &appH); + if (pVnode->tsdb == NULL) { + dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + return terrno; + } + sprintf(temp, "%s/wal", rootDir); pVnode->wal = walOpen(temp, &pVnode->walCfg); + walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); SSyncInfo syncInfo; syncInfo.vgId = pVnode->vgId; @@ -208,24 +230,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.notifyRole = vnodeNotifyRole; pVnode->sync = syncStart(&syncInfo); + // start continuous query + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + cqStart(pVnode->cq); + pVnode->events = NULL; - pVnode->cq = NULL; - - STsdbAppH appH = {0}; - appH.appH = (void *)pVnode; - appH.walCallBack = vnodeWalCallback; - - sprintf(temp, "%s/tsdb", rootDir); - void *pTsdb = tsdbOpenRepo(temp, &appH); - if (pTsdb == NULL) { - dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - return terrno; - } - - pVnode->tsdb = pTsdb; - - walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); pVnode->status = TAOS_VN_STATUS_READY; dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); @@ -350,10 +359,16 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { pVnode->sync = NULL; } - tsdbCloseRepo(pVnode->tsdb); - walClose(pVnode->wal); - vnodeSaveVersion(pVnode); + cqClose(pVnode->cq); + pVnode->cq = NULL; + tsdbCloseRepo(pVnode->tsdb); + pVnode->tsdb = NULL; + + walClose(pVnode->wal); + pVnode->wal = NULL; + + vnodeSaveVersion(pVnode); vnodeRelease(pVnode); } @@ -377,6 +392,11 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) { static void vnodeNotifyRole(void *ahandle, int8_t role) { SVnodeObj *pVnode = ahandle; pVnode->role = role; + + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + cqStart(pVnode->cq); + else + cqStop(pVnode->cq); } static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index a88737bc39..278a2bb747 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -26,6 +26,7 @@ #include "vnode.h" #include "vnodeInt.h" #include "vnodeLog.h" +#include "tcq.h" static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); @@ -149,7 +150,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe } code = tsdbCreateTable(pVnode->tsdb, &tCfg); - tfree(pDestSchema); dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code);