From 3a6a714a945e741f3c05f07da45645b0f4376799 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 19 Jan 2022 01:56:41 -0800 Subject: [PATCH] remove files --- source/libs/tfs/inc/tfsInt.h | 1 - src/cq/CMakeLists.txt | 15 - src/cq/src/cqMain.c | 532 ----------------------------------- src/cq/test/CMakeLists.txt | 6 - src/cq/test/cqtest.c | 108 ------- src/inc/query.h | 106 ------- src/inc/tcq.h | 74 ----- src/inc/tsdb.h | 426 ---------------------------- 8 files changed, 1268 deletions(-) delete mode 100644 src/cq/CMakeLists.txt delete mode 100644 src/cq/src/cqMain.c delete mode 100644 src/cq/test/CMakeLists.txt delete mode 100644 src/cq/test/cqtest.c delete mode 100644 src/inc/query.h delete mode 100644 src/inc/tcq.h delete mode 100644 src/inc/tsdb.h diff --git a/source/libs/tfs/inc/tfsInt.h b/source/libs/tfs/inc/tfsInt.h index c88a2a4ea8..cfc246f07b 100644 --- a/source/libs/tfs/inc/tfsInt.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -22,7 +22,6 @@ #include "taoserror.h" #include "tcoding.h" #include "tfs.h" -#include "tglobal.h" #include "thash.h" #include "tlog.h" diff --git a/src/cq/CMakeLists.txt b/src/cq/CMakeLists.txt deleted file mode 100644 index f01ccb8728..0000000000 --- a/src/cq/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20) -PROJECT(TDengine) - -INCLUDE_DIRECTORIES(inc) -INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) -INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) -AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) - -ADD_LIBRARY(tcq ${SRC}) -IF (TD_SOMODE_STATIC) - TARGET_LINK_LIBRARIES(tcq tutil common taos_static) -ELSE () - TARGET_LINK_LIBRARIES(tcq tutil common taos) -ENDIF () -ADD_SUBDIRECTORY(test) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c deleted file mode 100644 index d610ed605f..0000000000 --- a/src/cq/src/cqMain.c +++ /dev/null @@ -1,532 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE - -#include -#include -#include -#include - -#include "../../../include/client/taos.h" -#include "taosdef.h" -#include "tmsg.h" -#include "tcq.h" -#include "tdataformat.h" -#include "tglobal.h" -#include "tlog.h" -#include "tsclient.h" -#include "ttimer.h" -#include "twal.h" - -#define cFatal(...) { if (cqDebugFlag & DEBUG_FATAL) { taosPrintLog("CQ FATAL ", 255, __VA_ARGS__); }} -#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("CQ ERROR ", 255, __VA_ARGS__); }} -#define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("CQ WARN ", 255, __VA_ARGS__); }} -#define cInfo(...) { if (cqDebugFlag & DEBUG_INFO) { taosPrintLog("CQ ", 255, __VA_ARGS__); }} -#define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }} -#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }} - - -typedef struct SCqObj { - tmr_h tmrId; - int64_t rid; - uint64_t uid; - int32_t tid; // table ID - int32_t rowSize; // bytes of a row - char * dstTable; - char * sqlStr; // SQL string - STSchema * pSchema; // pointer to schema array - void * pStream; - struct SCqObj *prev; - struct SCqObj *next; - SCqContext * pContext; -} SCqObj; - -static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); -static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); - -int32_t cqObjRef = -1; -int32_t cqVnodeNum = 0; - -void cqRmFromList(SCqObj *pObj) { - //LOCK in caller - - SCqContext *pContext = pObj->pContext; - - if (pObj->prev) { - pObj->prev->next = pObj->next; - } else { - pContext->pHead = pObj->next; - } - - if (pObj->next) { - pObj->next->prev = pObj->prev; - } - -} - -static void freeSCqContext(void *handle) { - if (handle == NULL) { - return; - } - SCqContext *pContext = handle; - pthread_mutex_destroy(&pContext->mutex); - - taosTmrCleanUp(pContext->tmrCtrl); - pContext->tmrCtrl = NULL; - cDebug("vgId:%d, CQ is closed", pContext->vgId); - free(pContext); -} - - -void cqFree(void *handle) { - if (tsEnableStream == 0) { - return; - } - SCqObj *pObj = handle; - SCqContext *pContext = pObj->pContext; - int32_t delete = 0; - - pthread_mutex_lock(&pContext->mutex); - - // free the resources associated - if (pObj->pStream) { - taos_close_stream(pObj->pStream); - pObj->pStream = NULL; - } else { - taosTmrStop(pObj->tmrId); - pObj->tmrId = 0; - } - - cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); - tdFreeSchema(pObj->pSchema); - free(pObj->dstTable); - free(pObj->sqlStr); - free(pObj); - - pContext->cqObjNum--; - - if (pContext->cqObjNum <= 0 && pContext->delete) { - delete = 1; - } - - pthread_mutex_unlock(&pContext->mutex); - - if (delete) { - freeSCqContext(pContext); - } -} - - -void cqCreateRef() { - int32_t ref = atomic_load_32(&cqObjRef); - if (ref == -1) { - ref = taosOpenRef(4096, cqFree); - - if (atomic_val_compare_exchange_32(&cqObjRef, -1, ref) != -1) { - taosCloseRef(ref); - } - } -} - - -void *cqOpen(void *ahandle, const SCqCfg *pCfg) { - if (tsEnableStream == 0) { - return NULL; - } - SCqContext *pContext = calloc(sizeof(SCqContext), 1); - if (pContext == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - atomic_add_fetch_32(&cqVnodeNum, 1); - - cqCreateRef(); - - pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ"); - - tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user)); - tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass)); - const char* db = pCfg->db; - for (const char* p = db; *p != 0; p++) { - if (*p == '.') { - db = p + 1; - break; - } - } - tstrncpy(pContext->db, db, sizeof(pContext->db)); - pContext->vgId = pCfg->vgId; - pContext->cqWrite = pCfg->cqWrite; - tscEmbedded = 1; - - pthread_mutex_init(&pContext->mutex, NULL); - - - cDebug("vgId:%d, CQ is opened", pContext->vgId); - - return pContext; -} - - -void cqClose(void *handle) { - if (tsEnableStream == 0) { - return; - } - SCqContext *pContext = handle; - if (handle == NULL) return; - - pContext->delete = 1; - int32_t hasCq = 0; - int32_t existLoop = 0; - - // stop all CQs - cqStop(pContext); - - int64_t rid = 0; - - while (1) { - pthread_mutex_lock(&pContext->mutex); - - SCqObj *pObj = pContext->pHead; - if (pObj) { - cqRmFromList(pObj); - - rid = pObj->rid; - - hasCq = 1; - - if (pContext->pHead == NULL) { - existLoop = 1; - } - } else { - pthread_mutex_unlock(&pContext->mutex); - break; - } - - pthread_mutex_unlock(&pContext->mutex); - - taosRemoveRef(cqObjRef, rid); - - if (existLoop) { - break; - } - } - - if (hasCq == 0) { - freeSCqContext(pContext); - } - - int32_t remainn = atomic_sub_fetch_32(&cqVnodeNum, 1); - if (remainn <= 0) { - int32_t ref = cqObjRef; - cqObjRef = -1; - taosCloseRef(ref); - } -} - -void cqStart(void *handle) { - if (tsEnableStream == 0) { - return; - } - SCqContext *pContext = handle; - if (pContext->dbConn || pContext->master) return; - - cDebug("vgId:%d, start all CQs", pContext->vgId); - pthread_mutex_lock(&pContext->mutex); - - pContext->master = 1; - - SCqObj *pObj = pContext->pHead; - while (pObj) { - cqCreateStream(pContext, pObj); - pObj = pObj->next; - } - - pthread_mutex_unlock(&pContext->mutex); -} - -void cqStop(void *handle) { - if (tsEnableStream == 0) { - return; - } - - SCqContext *pContext = handle; - cDebug("vgId:%d, stop all CQs", pContext->vgId); - if (pContext->dbConn == NULL || pContext->master == 0) return; - - pthread_mutex_lock(&pContext->mutex); - - pContext->master = 0; - SCqObj *pObj = pContext->pHead; - while (pObj) { - if (pObj->pStream) { - taos_close_stream(pObj->pStream); - pObj->pStream = NULL; - cInfo("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr); - } else { - taosTmrStop(pObj->tmrId); - pObj->tmrId = 0; - } - pObj = pObj->next; - } - - if (pContext->dbConn) taos_close(pContext->dbConn); - pContext->dbConn = NULL; - - pthread_mutex_unlock(&pContext->mutex); -} - -void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start) { - if (tsEnableStream == 0) { - return NULL; - } - SCqContext *pContext = handle; - int64_t rid = 0; - - pthread_mutex_lock(&pContext->mutex); - - SCqObj *pObj = pContext->pHead; - while (pObj) { - if (pObj->uid == uid) { - rid = pObj->rid; - pthread_mutex_unlock(&pContext->mutex); - return (void *)rid; - } - - pObj = pObj->next; - } - - pthread_mutex_unlock(&pContext->mutex); - - pObj = calloc(sizeof(SCqObj), 1); - if (pObj == NULL) return NULL; - - pObj->uid = uid; - pObj->tid = sid; - if (dstTable != NULL) { - pObj->dstTable = strdup(dstTable); - } - pObj->sqlStr = strdup(sqlStr); - - pObj->pSchema = tdDupSchema(pSchema); - pObj->rowSize = schemaTLen(pSchema); - - cInfo("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); - - pthread_mutex_lock(&pContext->mutex); - - pObj->next = pContext->pHead; - if (pContext->pHead) pContext->pHead->prev = pObj; - pContext->pHead = pObj; - - pContext->cqObjNum++; - - pObj->rid = taosAddRef(cqObjRef, pObj); - - if(start && pContext->master) { - cqCreateStream(pContext, pObj); - } else { - pObj->pContext = pContext; - } - - rid = pObj->rid; - - pthread_mutex_unlock(&pContext->mutex); - - - return (void *)rid; -} - -void cqDrop(void *handle) { - if (tsEnableStream == 0) { - return; - } - - SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)handle); - if (pObj == NULL) { - return; - } - - SCqContext *pContext = pObj->pContext; - - pthread_mutex_lock(&pContext->mutex); - - cqRmFromList(pObj); - - // free the resources associated - if (pObj->pStream) { - taos_close_stream(pObj->pStream); - pObj->pStream = NULL; - } else { - taosTmrStop(pObj->tmrId); - pObj->tmrId = 0; - } - - pthread_mutex_unlock(&pContext->mutex); - - taosRemoveRef(cqObjRef, (int64_t)handle); - taosReleaseRef(cqObjRef, (int64_t)handle); -} - -static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { - SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param); - if (pObj == NULL) { - return; - } - - SCqContext* pContext = pObj->pContext; - SSqlObj* pSql = (SSqlObj*)result; - if (code == TSDB_CODE_SUCCESS) { - if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) { - taos_close(pSql->pTscObj); - } - } - - pthread_mutex_lock(&pContext->mutex); - cqCreateStream(pContext, pObj); - pthread_mutex_unlock(&pContext->mutex); - - taosReleaseRef(cqObjRef, (int64_t)param); -} - -static void cqProcessCreateTimer(void *param, void *tmrId) { - SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param); - if (pObj == NULL) { - return; - } - - SCqContext* pContext = pObj->pContext; - - if (pContext->dbConn == NULL) { - cDebug("vgId:%d, try connect to TDengine", pContext->vgId); - taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL); - } else { - pthread_mutex_lock(&pContext->mutex); - cqCreateStream(pContext, pObj); - pthread_mutex_unlock(&pContext->mutex); - } - - taosReleaseRef(cqObjRef, (int64_t)param); -} - -// inner implement in tscStream.c -TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), - int64_t stime, void *param, void (*callback)(void *), void* cqhandle); - -static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { - pObj->pContext = pContext; - - if (pContext->dbConn == NULL) { - cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId); - pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl); - return; - } - - pObj->tmrId = 0; - - if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, \ - INT64_MIN, (void *)pObj->rid, NULL, pContext); - - // TODO the pObj->pStream may be released if error happens - if (pObj->pStream) { - pContext->num++; - cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr); - } else { - cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); - } - } -} - -static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { - SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param); - if (pObj == NULL) { - return; - } - - if (tres == NULL && row == NULL) { - taos_close_stream(pObj->pStream); - - pObj->pStream = NULL; - - taosReleaseRef(cqObjRef, (int64_t)param); - - return; - } - - SCqContext *pContext = pObj->pContext; - STSchema *pSchema = pObj->pSchema; - if (pObj->pStream == NULL) { - taosReleaseRef(cqObjRef, (int64_t)param); - return; - } - - cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); - - int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_MEM_ROW_DATA_HEAD_SIZE + pObj->rowSize; - char *buffer = calloc(size, 1); - - SWalHead *pHead = (SWalHead *)buffer; - SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead)); - SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); - - SMemRow trow = (SMemRow)pBlk->data; - SDataRow dataRow = (SDataRow)memRowDataBody(trow); - memRowSetType(trow, SMEM_ROW_DATA); - tdInitDataRow(dataRow, pSchema); - - for (int32_t i = 0; i < pSchema->numOfCols; i++) { - STColumn *c = pSchema->columns + i; - void *val = row[i]; - if (val == NULL) { - val = (void *)getNullValue(c->type); - } else if (c->type == TSDB_DATA_TYPE_BINARY) { - val = ((char*)val) - sizeof(VarDataLenT); - } else if (c->type == TSDB_DATA_TYPE_NCHAR) { - char buf[TSDB_MAX_NCHAR_LEN]; - int32_t len = taos_fetch_lengths(tres)[i]; - taosMbsToUcs4(val, len, buf, sizeof(buf), &len); - memcpy((char *)val + sizeof(VarDataLenT), buf, len); - varDataLen(val) = len; - } - tdAppendColVal(dataRow, val, c->type, c->offset); - } - pBlk->dataLen = htonl(memRowDataTLen(trow)); - pBlk->schemaLen = 0; - - pBlk->uid = htobe64(pObj->uid); - pBlk->tid = htonl(pObj->tid); - pBlk->numOfRows = htons(1); - pBlk->sversion = htonl(pSchema->version); - pBlk->padding = 0; - - pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowDataTLen(trow); - - pMsg->header.vgId = htonl(pContext->vgId); - pMsg->header.contLen = htonl(pHead->len); - pMsg->length = pMsg->header.contLen; - pMsg->numOfBlocks = htonl(1); - - pHead->msgType = TDMT_VND_SUBMIT; - pHead->version = 0; - - // write into vnode write queue - pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL); - free(buffer); - - taosReleaseRef(cqObjRef, (int64_t)param); -} - diff --git a/src/cq/test/CMakeLists.txt b/src/cq/test/CMakeLists.txt deleted file mode 100644 index d713dd7401..0000000000 --- a/src/cq/test/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20) -PROJECT(TDengine) - -LIST(APPEND CQTEST_SRC ./cqtest.c) -ADD_EXECUTABLE(cqtest ${CQTEST_SRC}) -TARGET_LINK_LIBRARIES(cqtest tcq taos_static) diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c deleted file mode 100644 index ee52152ad8..0000000000 --- a/src/cq/test/cqtest.c +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -//#define _DEFAULT_SOURCE -#include "os.h" -#include "taosdef.h" -#include "tmsg.h" -#include "tglobal.h" -#include "tlog.h" -#include "tcq.h" - -int64_t ver = 0; -void *pCq = NULL; - -int writeToQueue(int32_t vgId, void *data, int type, void *pMsg) { - return 0; -} - -int main(int argc, char *argv[]) { - int num = 3; - - for (int i=1; i - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#ifndef TDENGINE_QUERY_H -#define TDENGINE_QUERY_H - -#ifdef __cplusplus -extern "C" { -#endif - -typedef void* qinfo_t; - -/** - * create the qinfo object according to QueryTableMsg - * @param tsdb - * @param pQueryTableMsg - * @param qinfo - * @return - */ -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId); - - -/** - * the main query execution function, including query on both table and multitables, - * which are decided according to the tag or table name query conditions - * - * @param qinfo - * @return - */ -bool qTableQuery(qinfo_t qinfo, uint64_t *qId); - -/** - * Retrieve the produced results information, if current query is not paused or completed, - * this function will be blocked to wait for the query execution completed or paused, - * in which case enough results have been produced already. - * - * @param qinfo - * @return - */ -int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext); - -/** - * - * Retrieve the actual results to fill the response message payload. - * Note that this function must be executed after qRetrieveQueryResultInfo is invoked. - * - * @param qinfo qinfo object - * @param pRsp response message - * @param contLen payload length - * @return - */ -int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); - -/** - * - * @param qinfo - * @return - */ -void* qGetResultRetrieveMsg(qinfo_t qinfo); - -/** - * kill current ongoing query and free query handle automatically - * @param qinfo qhandle - * @return - */ -int32_t qKillQuery(qinfo_t qinfo); - -//kill by qid -int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount); - -bool qSolveCommitNoBlock(void* pRepo, void* pMgmt); - -int32_t qQueryCompleted(qinfo_t qinfo); - -/** - * destroy query info structure - * @param qHandle - */ -void qDestroyQueryInfo(qinfo_t qHandle); - -void* qOpenQueryMgmt(int32_t vgId); -void qQueryMgmtNotifyClosed(void* pExecutor); -void qQueryMgmtReOpen(void *pExecutor); -void qCleanupQueryMgmt(void* pExecutor); -void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo); -void** qAcquireQInfo(void* pMgmt, uint64_t key); -void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle); -bool checkQIdEqual(void *qHandle, uint64_t qId); -int64_t genQueryId(void); - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_QUERY_H diff --git a/src/inc/tcq.h b/src/inc/tcq.h deleted file mode 100644 index 71efe33011..0000000000 --- a/src/inc/tcq.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#ifndef _TD_CQ_H_ -#define _TD_CQ_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#include "tdataformat.h" - -typedef int32_t (*FCqWrite)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg); - -typedef struct { - int32_t vgId; - char user[TSDB_USER_LEN]; - char pass[TSDB_PASSWORD_LEN]; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; // size must same with SVnodeObj.db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN] - FCqWrite cqWrite; -} SCqCfg; - -// SCqContext -typedef struct { - int32_t vgId; - int32_t master; - int32_t num; // number of continuous streams - char user[TSDB_USER_LEN]; - char pass[TSDB_PASSWORD_LEN]; - char db[TSDB_DB_NAME_LEN]; - FCqWrite cqWrite; - struct SCqObj *pHead; - void *dbConn; - void *tmrCtrl; - pthread_mutex_t mutex; - int32_t delete; - int32_t cqObjNum; -} SCqContext; - -// the following API shall be called by vnode -void *cqOpen(void *ahandle, const SCqCfg *pCfg); -void cqClose(void *handle); - -// if vnode is master, vnode call this API to start CQ -void cqStart(void *handle); - -// if vnode is slave/unsynced, vnode shall call this API to stop CQ -void cqStop(void *handle); - -// cqCreate is called by TSDB to start an instance of CQ -void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start); - -// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate -void cqDrop(void *handle); - -extern int32_t cqDebugFlag; - - -#ifdef __cplusplus -} -#endif - -#endif // _TD_CQ_H_ diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h deleted file mode 100644 index 130628e799..0000000000 --- a/src/inc/tsdb.h +++ /dev/null @@ -1,426 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#ifndef _TD_TSDB_H_ -#define _TD_TSDB_H_ - -#include -#include -#include - -#include "taosdef.h" -#include "tmsg.h" -#include "tarray.h" -#include "tdataformat.h" -#include "tname.h" -#include "hash.h" -#include "tlockfree.h" -#include "tlist.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#define TSDB_VERSION_MAJOR 1 -#define TSDB_VERSION_MINOR 0 - -#define TSDB_INVALID_SUPER_TABLE_ID -1 - -#define TSDB_STATUS_COMMIT_START 1 -#define TSDB_STATUS_COMMIT_OVER 2 -#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved - -// TSDB STATE DEFINITION -#define TSDB_STATE_OK 0x0 -#define TSDB_STATE_BAD_META 0x1 -#define TSDB_STATE_BAD_DATA 0x2 - -// --------- TSDB APPLICATION HANDLE DEFINITION -typedef struct { - void *appH; - void *cqH; - int (*notifyStatus)(void *, int status, int eno); - int (*eventCallBack)(void *); - void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, int start); - void (*cqDropFunc)(void *handle); -} STsdbAppH; - -// --------- TSDB REPOSITORY CONFIGURATION DEFINITION -typedef struct { - int32_t tsdbId; - int32_t cacheBlockSize; - int32_t totalBlocks; - int32_t daysPerFile; // day per file sharding policy - int32_t keep; // day of data to keep - int32_t keep1; - int32_t keep2; - int32_t minRowsPerFileBlock; // minimum rows per file block - int32_t maxRowsPerFileBlock; // maximum rows per file block - int8_t precision; - int8_t compression; - int8_t update; - int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2 -} STsdbCfg; - -#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) -#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0) -#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0) - -// --------- TSDB REPOSITORY USAGE STATISTICS -typedef struct { - int64_t totalStorage; // total bytes occupie - int64_t compStorage; - int64_t pointsWritten; // total data points written -} STsdbStat; - -typedef struct STsdbRepo STsdbRepo; - -STsdbCfg *tsdbGetCfg(const STsdbRepo *repo); - -// --------- TSDB REPOSITORY DEFINITION -int32_t tsdbCreateRepo(int repoid); -int32_t tsdbDropRepo(int repoid); -STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); -int tsdbCloseRepo(STsdbRepo *repo, int toCommit); -int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg); -int tsdbGetState(STsdbRepo *repo); -int8_t tsdbGetCompactState(STsdbRepo *repo); -// --------- TSDB TABLE DEFINITION -typedef struct { - uint64_t uid; // the unique table ID - int32_t tid; // the table ID in the repository. -} STableId; - -// --------- TSDB TABLE configuration -typedef struct { - ETableType type; - char * name; - STableId tableId; - int32_t sversion; - char * sname; // super table name - uint64_t superUid; - STSchema * schema; - STSchema * tagSchema; - SKVRow tagValues; - char * sql; -} STableCfg; - -void tsdbClearTableCfg(STableCfg *config); - -void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type, int16_t bytes); -char *tsdbGetTableName(void *pTable); - -#define TSDB_TABLEID(_table) ((STableId*) (_table)) -#define TSDB_PREV_ROW 0x1 -#define TSDB_NEXT_ROW 0x2 - -STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); - -int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg); -int tsdbDropTable(STsdbRepo *pRepo, STableId tableId); -int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg); - -uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); - -// the TSDB repository info -typedef struct STsdbRepoInfo { - STsdbCfg tsdbCfg; - uint64_t version; // version of the repository - int64_t tsdbTotalDataSize; // the original inserted data size - int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository - // TODO: Other informations to add -} STsdbRepoInfo; -STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo); - -// the meter information report structure -typedef struct { - STableCfg tableCfg; - uint64_t version; - int64_t tableTotalDataSize; // In bytes - int64_t tableTotalDiskSize; // In bytes -} STableInfo; - -// -- FOR INSERT DATA -/** - * Insert data to a table in a repository - * @param pRepo the TSDB repository handle - * @param pData the data to insert (will give a more specific description) - * - * @return the number of points inserted, -1 for failure and the error number is set - */ -int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp); - -// -- FOR QUERY TIME SERIES DATA - -typedef void *TsdbQueryHandleT; // Use void to hide implementation details - -#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 -#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 -#define BLOCK_LOAD_TABLE_RR_ORDER 3 - -// query condition to build multi-table data block iterator -typedef struct STsdbQueryCond { - STimeWindow twindow; - int32_t order; // desc|asc order to iterate the data block - int32_t numOfCols; - SColumnInfo *colList; - bool loadExternalRows; // load external rows or not - int32_t type; // data block load type: -} STsdbQueryCond; - -typedef struct STableData STableData; -typedef struct { - T_REF_DECLARE() - SRWLatch latch; - TSKEY keyFirst; - TSKEY keyLast; - int64_t numOfRows; - int32_t maxTables; - STableData **tData; - SList * actList; - SList * extraBuffList; - SList * bufBlockList; - int64_t pointsAdd; // TODO - int64_t storageAdd; // TODO -} SMemTable; - -typedef struct { - SMemTable* mem; - SMemTable* imem; - SMemTable mtable; - SMemTable* omem; -} SMemSnapshot; - -typedef struct SMemRef { - int32_t ref; - SMemSnapshot snapshot; -} SMemRef; - -typedef struct SDataBlockInfo { - STimeWindow window; - int32_t rows; - int32_t numOfCols; - int64_t uid; - int32_t tid; -} SDataBlockInfo; - -typedef struct SFileBlockInfo { - int32_t numBlocksOfStep; -} SFileBlockInfo; - -typedef struct { - void *pTable; - TSKEY lastKey; -} STableKeyInfo; - -typedef struct { - uint32_t numOfTables; - SArray *pGroupList; - SHashObj *map; // speedup acquire the tableQueryInfo by table uid -} STableGroupInfo; - -#define TSDB_BLOCK_DIST_STEP_ROWS 16 -typedef struct { - uint16_t rowSize; - uint16_t numOfFiles; - uint32_t numOfTables; - uint64_t totalSize; - uint64_t totalRows; - int32_t maxRows; - int32_t minRows; - int32_t firstSeekTimeUs; - uint32_t numOfRowsInMemTable; - uint32_t numOfSmallBlocks; - SArray *dataBlockInfos; -} STableBlockDist; - -/** - * Get the data block iterator, starting from position according to the query condition - * - * @param tsdb tsdb handle - * @param pCond query condition, including time window, result set order, and basic required columns for each block - * @param tableInfoGroup table object list in the form of set, grouped into different sets according to the - * group by condition - * @param qinfo query info handle from query processor - * @return - */ -TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, - SMemRef *pRef); - -/** - * Get the last row of the given query time window for all the tables in STableGroupInfo object. - * Note that only one data block with only row will be returned while invoking retrieve data block function for - * all tables in this group. - * - * @param tsdb tsdb handle - * @param pCond query condition, including time window, result set order, and basic required columns for each block - * @param tableInfo table list. - * @return - */ -TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId, - SMemRef *pRef); - - -TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef); - -bool isTsdbCacheLastRow(TsdbQueryHandleT* pTsdbReadHandle); - - -/** - * get the queried table object list - * @param pHandle - * @return - */ -SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle); - -/** - * get the group list according to table id from client - * @param tsdb - * @param pCond - * @param groupList - * @param qinfo - * @return - */ -TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, - uint64_t qId, SMemRef *pRef); - - -/** - * get num of rows in mem table - * - * @param pHandle - * @return row size - */ - -int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle); - -/** - * move to next block if exists - * - * @param pTsdbReadHandle - * @return - */ -bool tsdbNextDataBlock(TsdbQueryHandleT pTsdbReadHandle); - -/** - * Get current data block information - * - * @param pTsdbReadHandle - * @param pBlockInfo - * @return - */ -void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); - -/** - * - * Get the pre-calculated information w.r.t. current data block. - * - * In case of data block in cache, the pBlockStatis will always be NULL. - * If a block is not completed loaded from disk, the pBlockStatis will be NULL. - - * @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0 - * @return - */ -int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataStatis **pBlockStatis); - -/** - * - * The query condition with primary timestamp is passed to iterator during its constructor function, - * the returned data block must be satisfied with the time window condition in any cases, - * which means the SData data block is not actually the completed disk data blocks. - * - * @param pTsdbReadHandle query handle - * @param pColumnIdList required data columns id list - * @return - */ -SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pTsdbReadHandle, SArray *pColumnIdList); - -/** - * Get the qualified table id for a super table according to the tag query expression. - * @param stableid. super table sid - * @param pTagCond. tag query condition - */ -int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len, - int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList, - SColIndex *pColIndex, int32_t numOfCols); - -/** - * destroy the created table group list, which is generated by tag query - * @param pGroupList - */ -void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); - -/** - * create the table group result including only one table, used to handle the normal table query - * - * @param tsdb tsdbHandle - * @param uid table uid - * @param pGroupInfo the generated result - * @return - */ -int32_t tsdbGetOneTableGroup(STsdbRepo *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); - -/** - * - * @param tsdb - * @param pTableIdList - * @param pGroupInfo - * @return - */ -int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); - -/** - * clean up the query handle - * @param queryHandle - */ -void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle); - -void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond); - -void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList); - -int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo); - -/** - * get the statistics of repo usage - * @param repo. point to the tsdbrepo - * @param totalPoints. total data point written - * @param totalStorage. total bytes took by the tsdb - * @param compStorage. total bytes took by the tsdb after compressed - */ -void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage); - -int tsdbInitCommitQueue(); -void tsdbDestroyCommitQueue(); -int tsdbSyncCommit(STsdbRepo *repo); -void tsdbIncCommitRef(int vgId); -void tsdbDecCommitRef(int vgId); - -// For TSDB file sync -int tsdbSyncSend(void *pRepo, SOCKET socketFd); -int tsdbSyncRecv(void *pRepo, SOCKET socketFd); - -// For TSDB Compact -int tsdbCompact(STsdbRepo *pRepo); - -// For TSDB Health Monitor - -// no problem return true -bool tsdbNoProblem(STsdbRepo* pRepo); - -#ifdef __cplusplus -} -#endif - -#endif // _TD_TSDB_H_