remove files
This commit is contained in:
parent
68f45ebef7
commit
3a6a714a94
|
@ -22,7 +22,6 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
#include "tfs.h"
|
#include "tfs.h"
|
||||||
#include "tglobal.h"
|
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
|
||||||
|
|
|
@ -1,15 +0,0 @@
|
||||||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
|
|
||||||
PROJECT(TDengine)
|
|
||||||
|
|
||||||
INCLUDE_DIRECTORIES(inc)
|
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
|
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
|
|
||||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
|
|
||||||
|
|
||||||
ADD_LIBRARY(tcq ${SRC})
|
|
||||||
IF (TD_SOMODE_STATIC)
|
|
||||||
TARGET_LINK_LIBRARIES(tcq tutil common taos_static)
|
|
||||||
ELSE ()
|
|
||||||
TARGET_LINK_LIBRARIES(tcq tutil common taos)
|
|
||||||
ENDIF ()
|
|
||||||
ADD_SUBDIRECTORY(test)
|
|
|
@ -1,532 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
|
||||||
|
|
||||||
#include <errno.h>
|
|
||||||
#include <pthread.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include <string.h>
|
|
||||||
|
|
||||||
#include "../../../include/client/taos.h"
|
|
||||||
#include "taosdef.h"
|
|
||||||
#include "tmsg.h"
|
|
||||||
#include "tcq.h"
|
|
||||||
#include "tdataformat.h"
|
|
||||||
#include "tglobal.h"
|
|
||||||
#include "tlog.h"
|
|
||||||
#include "tsclient.h"
|
|
||||||
#include "ttimer.h"
|
|
||||||
#include "twal.h"
|
|
||||||
|
|
||||||
#define cFatal(...) { if (cqDebugFlag & DEBUG_FATAL) { taosPrintLog("CQ FATAL ", 255, __VA_ARGS__); }}
|
|
||||||
#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("CQ ERROR ", 255, __VA_ARGS__); }}
|
|
||||||
#define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("CQ WARN ", 255, __VA_ARGS__); }}
|
|
||||||
#define cInfo(...) { if (cqDebugFlag & DEBUG_INFO) { taosPrintLog("CQ ", 255, __VA_ARGS__); }}
|
|
||||||
#define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
|
|
||||||
#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
|
|
||||||
|
|
||||||
|
|
||||||
typedef struct SCqObj {
|
|
||||||
tmr_h tmrId;
|
|
||||||
int64_t rid;
|
|
||||||
uint64_t uid;
|
|
||||||
int32_t tid; // table ID
|
|
||||||
int32_t rowSize; // bytes of a row
|
|
||||||
char * dstTable;
|
|
||||||
char * sqlStr; // SQL string
|
|
||||||
STSchema * pSchema; // pointer to schema array
|
|
||||||
void * pStream;
|
|
||||||
struct SCqObj *prev;
|
|
||||||
struct SCqObj *next;
|
|
||||||
SCqContext * pContext;
|
|
||||||
} SCqObj;
|
|
||||||
|
|
||||||
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
|
|
||||||
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
|
|
||||||
|
|
||||||
int32_t cqObjRef = -1;
|
|
||||||
int32_t cqVnodeNum = 0;
|
|
||||||
|
|
||||||
void cqRmFromList(SCqObj *pObj) {
|
|
||||||
//LOCK in caller
|
|
||||||
|
|
||||||
SCqContext *pContext = pObj->pContext;
|
|
||||||
|
|
||||||
if (pObj->prev) {
|
|
||||||
pObj->prev->next = pObj->next;
|
|
||||||
} else {
|
|
||||||
pContext->pHead = pObj->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pObj->next) {
|
|
||||||
pObj->next->prev = pObj->prev;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
static void freeSCqContext(void *handle) {
|
|
||||||
if (handle == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SCqContext *pContext = handle;
|
|
||||||
pthread_mutex_destroy(&pContext->mutex);
|
|
||||||
|
|
||||||
taosTmrCleanUp(pContext->tmrCtrl);
|
|
||||||
pContext->tmrCtrl = NULL;
|
|
||||||
cDebug("vgId:%d, CQ is closed", pContext->vgId);
|
|
||||||
free(pContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void cqFree(void *handle) {
|
|
||||||
if (tsEnableStream == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SCqObj *pObj = handle;
|
|
||||||
SCqContext *pContext = pObj->pContext;
|
|
||||||
int32_t delete = 0;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
|
||||||
|
|
||||||
// free the resources associated
|
|
||||||
if (pObj->pStream) {
|
|
||||||
taos_close_stream(pObj->pStream);
|
|
||||||
pObj->pStream = NULL;
|
|
||||||
} else {
|
|
||||||
taosTmrStop(pObj->tmrId);
|
|
||||||
pObj->tmrId = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
tdFreeSchema(pObj->pSchema);
|
|
||||||
free(pObj->dstTable);
|
|
||||||
free(pObj->sqlStr);
|
|
||||||
free(pObj);
|
|
||||||
|
|
||||||
pContext->cqObjNum--;
|
|
||||||
|
|
||||||
if (pContext->cqObjNum <= 0 && pContext->delete) {
|
|
||||||
delete = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
|
|
||||||
if (delete) {
|
|
||||||
freeSCqContext(pContext);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void cqCreateRef() {
|
|
||||||
int32_t ref = atomic_load_32(&cqObjRef);
|
|
||||||
if (ref == -1) {
|
|
||||||
ref = taosOpenRef(4096, cqFree);
|
|
||||||
|
|
||||||
if (atomic_val_compare_exchange_32(&cqObjRef, -1, ref) != -1) {
|
|
||||||
taosCloseRef(ref);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
|
||||||
if (tsEnableStream == 0) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
SCqContext *pContext = calloc(sizeof(SCqContext), 1);
|
|
||||||
if (pContext == NULL) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_add_fetch_32(&cqVnodeNum, 1);
|
|
||||||
|
|
||||||
cqCreateRef();
|
|
||||||
|
|
||||||
pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ");
|
|
||||||
|
|
||||||
tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user));
|
|
||||||
tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass));
|
|
||||||
const char* db = pCfg->db;
|
|
||||||
for (const char* p = db; *p != 0; p++) {
|
|
||||||
if (*p == '.') {
|
|
||||||
db = p + 1;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tstrncpy(pContext->db, db, sizeof(pContext->db));
|
|
||||||
pContext->vgId = pCfg->vgId;
|
|
||||||
pContext->cqWrite = pCfg->cqWrite;
|
|
||||||
tscEmbedded = 1;
|
|
||||||
|
|
||||||
pthread_mutex_init(&pContext->mutex, NULL);
|
|
||||||
|
|
||||||
|
|
||||||
cDebug("vgId:%d, CQ is opened", pContext->vgId);
|
|
||||||
|
|
||||||
return pContext;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void cqClose(void *handle) {
|
|
||||||
if (tsEnableStream == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SCqContext *pContext = handle;
|
|
||||||
if (handle == NULL) return;
|
|
||||||
|
|
||||||
pContext->delete = 1;
|
|
||||||
int32_t hasCq = 0;
|
|
||||||
int32_t existLoop = 0;
|
|
||||||
|
|
||||||
// stop all CQs
|
|
||||||
cqStop(pContext);
|
|
||||||
|
|
||||||
int64_t rid = 0;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
|
||||||
|
|
||||||
SCqObj *pObj = pContext->pHead;
|
|
||||||
if (pObj) {
|
|
||||||
cqRmFromList(pObj);
|
|
||||||
|
|
||||||
rid = pObj->rid;
|
|
||||||
|
|
||||||
hasCq = 1;
|
|
||||||
|
|
||||||
if (pContext->pHead == NULL) {
|
|
||||||
existLoop = 1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
|
|
||||||
taosRemoveRef(cqObjRef, rid);
|
|
||||||
|
|
||||||
if (existLoop) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (hasCq == 0) {
|
|
||||||
freeSCqContext(pContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t remainn = atomic_sub_fetch_32(&cqVnodeNum, 1);
|
|
||||||
if (remainn <= 0) {
|
|
||||||
int32_t ref = cqObjRef;
|
|
||||||
cqObjRef = -1;
|
|
||||||
taosCloseRef(ref);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void cqStart(void *handle) {
|
|
||||||
if (tsEnableStream == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SCqContext *pContext = handle;
|
|
||||||
if (pContext->dbConn || pContext->master) return;
|
|
||||||
|
|
||||||
cDebug("vgId:%d, start all CQs", pContext->vgId);
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
|
||||||
|
|
||||||
pContext->master = 1;
|
|
||||||
|
|
||||||
SCqObj *pObj = pContext->pHead;
|
|
||||||
while (pObj) {
|
|
||||||
cqCreateStream(pContext, pObj);
|
|
||||||
pObj = pObj->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
void cqStop(void *handle) {
|
|
||||||
if (tsEnableStream == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCqContext *pContext = handle;
|
|
||||||
cDebug("vgId:%d, stop all CQs", pContext->vgId);
|
|
||||||
if (pContext->dbConn == NULL || pContext->master == 0) return;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
|
||||||
|
|
||||||
pContext->master = 0;
|
|
||||||
SCqObj *pObj = pContext->pHead;
|
|
||||||
while (pObj) {
|
|
||||||
if (pObj->pStream) {
|
|
||||||
taos_close_stream(pObj->pStream);
|
|
||||||
pObj->pStream = NULL;
|
|
||||||
cInfo("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
} else {
|
|
||||||
taosTmrStop(pObj->tmrId);
|
|
||||||
pObj->tmrId = 0;
|
|
||||||
}
|
|
||||||
pObj = pObj->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pContext->dbConn) taos_close(pContext->dbConn);
|
|
||||||
pContext->dbConn = NULL;
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start) {
|
|
||||||
if (tsEnableStream == 0) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
SCqContext *pContext = handle;
|
|
||||||
int64_t rid = 0;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
|
||||||
|
|
||||||
SCqObj *pObj = pContext->pHead;
|
|
||||||
while (pObj) {
|
|
||||||
if (pObj->uid == uid) {
|
|
||||||
rid = pObj->rid;
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
return (void *)rid;
|
|
||||||
}
|
|
||||||
|
|
||||||
pObj = pObj->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
|
|
||||||
pObj = calloc(sizeof(SCqObj), 1);
|
|
||||||
if (pObj == NULL) return NULL;
|
|
||||||
|
|
||||||
pObj->uid = uid;
|
|
||||||
pObj->tid = sid;
|
|
||||||
if (dstTable != NULL) {
|
|
||||||
pObj->dstTable = strdup(dstTable);
|
|
||||||
}
|
|
||||||
pObj->sqlStr = strdup(sqlStr);
|
|
||||||
|
|
||||||
pObj->pSchema = tdDupSchema(pSchema);
|
|
||||||
pObj->rowSize = schemaTLen(pSchema);
|
|
||||||
|
|
||||||
cInfo("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
|
||||||
|
|
||||||
pObj->next = pContext->pHead;
|
|
||||||
if (pContext->pHead) pContext->pHead->prev = pObj;
|
|
||||||
pContext->pHead = pObj;
|
|
||||||
|
|
||||||
pContext->cqObjNum++;
|
|
||||||
|
|
||||||
pObj->rid = taosAddRef(cqObjRef, pObj);
|
|
||||||
|
|
||||||
if(start && pContext->master) {
|
|
||||||
cqCreateStream(pContext, pObj);
|
|
||||||
} else {
|
|
||||||
pObj->pContext = pContext;
|
|
||||||
}
|
|
||||||
|
|
||||||
rid = pObj->rid;
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
|
|
||||||
|
|
||||||
return (void *)rid;
|
|
||||||
}
|
|
||||||
|
|
||||||
void cqDrop(void *handle) {
|
|
||||||
if (tsEnableStream == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)handle);
|
|
||||||
if (pObj == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCqContext *pContext = pObj->pContext;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
|
||||||
|
|
||||||
cqRmFromList(pObj);
|
|
||||||
|
|
||||||
// free the resources associated
|
|
||||||
if (pObj->pStream) {
|
|
||||||
taos_close_stream(pObj->pStream);
|
|
||||||
pObj->pStream = NULL;
|
|
||||||
} else {
|
|
||||||
taosTmrStop(pObj->tmrId);
|
|
||||||
pObj->tmrId = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
|
|
||||||
taosRemoveRef(cqObjRef, (int64_t)handle);
|
|
||||||
taosReleaseRef(cqObjRef, (int64_t)handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
|
|
||||||
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
|
|
||||||
if (pObj == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCqContext* pContext = pObj->pContext;
|
|
||||||
SSqlObj* pSql = (SSqlObj*)result;
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
|
||||||
if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) {
|
|
||||||
taos_close(pSql->pTscObj);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
|
||||||
cqCreateStream(pContext, pObj);
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
|
|
||||||
taosReleaseRef(cqObjRef, (int64_t)param);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void cqProcessCreateTimer(void *param, void *tmrId) {
|
|
||||||
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
|
|
||||||
if (pObj == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCqContext* pContext = pObj->pContext;
|
|
||||||
|
|
||||||
if (pContext->dbConn == NULL) {
|
|
||||||
cDebug("vgId:%d, try connect to TDengine", pContext->vgId);
|
|
||||||
taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL);
|
|
||||||
} else {
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
|
||||||
cqCreateStream(pContext, pObj);
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosReleaseRef(cqObjRef, (int64_t)param);
|
|
||||||
}
|
|
||||||
|
|
||||||
// inner implement in tscStream.c
|
|
||||||
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
|
||||||
int64_t stime, void *param, void (*callback)(void *), void* cqhandle);
|
|
||||||
|
|
||||||
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
|
||||||
pObj->pContext = pContext;
|
|
||||||
|
|
||||||
if (pContext->dbConn == NULL) {
|
|
||||||
cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId);
|
|
||||||
pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pObj->tmrId = 0;
|
|
||||||
|
|
||||||
if (pObj->pStream == NULL) {
|
|
||||||
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, \
|
|
||||||
INT64_MIN, (void *)pObj->rid, NULL, pContext);
|
|
||||||
|
|
||||||
// TODO the pObj->pStream may be released if error happens
|
|
||||||
if (pObj->pStream) {
|
|
||||||
pContext->num++;
|
|
||||||
cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
} else {
|
|
||||||
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
|
||||||
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
|
|
||||||
if (pObj == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tres == NULL && row == NULL) {
|
|
||||||
taos_close_stream(pObj->pStream);
|
|
||||||
|
|
||||||
pObj->pStream = NULL;
|
|
||||||
|
|
||||||
taosReleaseRef(cqObjRef, (int64_t)param);
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCqContext *pContext = pObj->pContext;
|
|
||||||
STSchema *pSchema = pObj->pSchema;
|
|
||||||
if (pObj->pStream == NULL) {
|
|
||||||
taosReleaseRef(cqObjRef, (int64_t)param);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
|
|
||||||
int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_MEM_ROW_DATA_HEAD_SIZE + pObj->rowSize;
|
|
||||||
char *buffer = calloc(size, 1);
|
|
||||||
|
|
||||||
SWalHead *pHead = (SWalHead *)buffer;
|
|
||||||
SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead));
|
|
||||||
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
|
|
||||||
|
|
||||||
SMemRow trow = (SMemRow)pBlk->data;
|
|
||||||
SDataRow dataRow = (SDataRow)memRowDataBody(trow);
|
|
||||||
memRowSetType(trow, SMEM_ROW_DATA);
|
|
||||||
tdInitDataRow(dataRow, pSchema);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
|
|
||||||
STColumn *c = pSchema->columns + i;
|
|
||||||
void *val = row[i];
|
|
||||||
if (val == NULL) {
|
|
||||||
val = (void *)getNullValue(c->type);
|
|
||||||
} else if (c->type == TSDB_DATA_TYPE_BINARY) {
|
|
||||||
val = ((char*)val) - sizeof(VarDataLenT);
|
|
||||||
} else if (c->type == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
char buf[TSDB_MAX_NCHAR_LEN];
|
|
||||||
int32_t len = taos_fetch_lengths(tres)[i];
|
|
||||||
taosMbsToUcs4(val, len, buf, sizeof(buf), &len);
|
|
||||||
memcpy((char *)val + sizeof(VarDataLenT), buf, len);
|
|
||||||
varDataLen(val) = len;
|
|
||||||
}
|
|
||||||
tdAppendColVal(dataRow, val, c->type, c->offset);
|
|
||||||
}
|
|
||||||
pBlk->dataLen = htonl(memRowDataTLen(trow));
|
|
||||||
pBlk->schemaLen = 0;
|
|
||||||
|
|
||||||
pBlk->uid = htobe64(pObj->uid);
|
|
||||||
pBlk->tid = htonl(pObj->tid);
|
|
||||||
pBlk->numOfRows = htons(1);
|
|
||||||
pBlk->sversion = htonl(pSchema->version);
|
|
||||||
pBlk->padding = 0;
|
|
||||||
|
|
||||||
pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowDataTLen(trow);
|
|
||||||
|
|
||||||
pMsg->header.vgId = htonl(pContext->vgId);
|
|
||||||
pMsg->header.contLen = htonl(pHead->len);
|
|
||||||
pMsg->length = pMsg->header.contLen;
|
|
||||||
pMsg->numOfBlocks = htonl(1);
|
|
||||||
|
|
||||||
pHead->msgType = TDMT_VND_SUBMIT;
|
|
||||||
pHead->version = 0;
|
|
||||||
|
|
||||||
// write into vnode write queue
|
|
||||||
pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL);
|
|
||||||
free(buffer);
|
|
||||||
|
|
||||||
taosReleaseRef(cqObjRef, (int64_t)param);
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,6 +0,0 @@
|
||||||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
|
|
||||||
PROJECT(TDengine)
|
|
||||||
|
|
||||||
LIST(APPEND CQTEST_SRC ./cqtest.c)
|
|
||||||
ADD_EXECUTABLE(cqtest ${CQTEST_SRC})
|
|
||||||
TARGET_LINK_LIBRARIES(cqtest tcq taos_static)
|
|
|
@ -1,108 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
//#define _DEFAULT_SOURCE
|
|
||||||
#include "os.h"
|
|
||||||
#include "taosdef.h"
|
|
||||||
#include "tmsg.h"
|
|
||||||
#include "tglobal.h"
|
|
||||||
#include "tlog.h"
|
|
||||||
#include "tcq.h"
|
|
||||||
|
|
||||||
int64_t ver = 0;
|
|
||||||
void *pCq = NULL;
|
|
||||||
|
|
||||||
int writeToQueue(int32_t vgId, void *data, int type, void *pMsg) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
|
||||||
int num = 3;
|
|
||||||
|
|
||||||
for (int i=1; i<argc; ++i) {
|
|
||||||
if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
|
||||||
dDebugFlag = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-n") == 0 && i <argc-1) {
|
|
||||||
num = atoi(argv[++i]);
|
|
||||||
} else {
|
|
||||||
printf("\nusage: %s [options] \n", argv[0]);
|
|
||||||
printf(" [-n num]: number of streams, default:%d\n", num);
|
|
||||||
printf(" [-d debugFlag]: debug flag, default:%d\n", dDebugFlag);
|
|
||||||
printf(" [-h help]: print out this help\n\n");
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosInitLog("cq.log", 100000, 10);
|
|
||||||
|
|
||||||
SCqCfg cqCfg;
|
|
||||||
strcpy(cqCfg.user, TSDB_DEFAULT_USER);
|
|
||||||
strcpy(cqCfg.pass, TSDB_DEFAULT_PASS);
|
|
||||||
cqCfg.vgId = 2;
|
|
||||||
cqCfg.cqWrite = writeToQueue;
|
|
||||||
|
|
||||||
pCq = cqOpen(NULL, &cqCfg);
|
|
||||||
if (pCq == NULL) {
|
|
||||||
printf("failed to open CQ\n");
|
|
||||||
exit(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
STSchemaBuilder schemaBuilder = {0};
|
|
||||||
|
|
||||||
tdInitTSchemaBuilder(&schemaBuilder, 0);
|
|
||||||
tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
|
|
||||||
tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_INT, 1, 4);
|
|
||||||
|
|
||||||
STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
|
|
||||||
|
|
||||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
|
||||||
|
|
||||||
for (int sid =1; sid<10; ++sid) {
|
|
||||||
cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
tdFreeSchema(pSchema);
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
char c = (char)getchar();
|
|
||||||
|
|
||||||
switch(c) {
|
|
||||||
case 's':
|
|
||||||
cqStart(pCq);
|
|
||||||
break;
|
|
||||||
case 't':
|
|
||||||
cqStop(pCq);
|
|
||||||
break;
|
|
||||||
case 'c':
|
|
||||||
// create a CQ
|
|
||||||
break;
|
|
||||||
case 'd':
|
|
||||||
// drop a CQ
|
|
||||||
break;
|
|
||||||
case 'q':
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
printf("invalid command:%c", c);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (c=='q') break;
|
|
||||||
}
|
|
||||||
|
|
||||||
cqClose(pCq);
|
|
||||||
|
|
||||||
taosCloseLog();
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
106
src/inc/query.h
106
src/inc/query.h
|
@ -1,106 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
#ifndef TDENGINE_QUERY_H
|
|
||||||
#define TDENGINE_QUERY_H
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef void* qinfo_t;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* create the qinfo object according to QueryTableMsg
|
|
||||||
* @param tsdb
|
|
||||||
* @param pQueryTableMsg
|
|
||||||
* @param qinfo
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId);
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* the main query execution function, including query on both table and multitables,
|
|
||||||
* which are decided according to the tag or table name query conditions
|
|
||||||
*
|
|
||||||
* @param qinfo
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
bool qTableQuery(qinfo_t qinfo, uint64_t *qId);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieve the produced results information, if current query is not paused or completed,
|
|
||||||
* this function will be blocked to wait for the query execution completed or paused,
|
|
||||||
* in which case enough results have been produced already.
|
|
||||||
*
|
|
||||||
* @param qinfo
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext);
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Retrieve the actual results to fill the response message payload.
|
|
||||||
* Note that this function must be executed after qRetrieveQueryResultInfo is invoked.
|
|
||||||
*
|
|
||||||
* @param qinfo qinfo object
|
|
||||||
* @param pRsp response message
|
|
||||||
* @param contLen payload length
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param qinfo
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
void* qGetResultRetrieveMsg(qinfo_t qinfo);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* kill current ongoing query and free query handle automatically
|
|
||||||
* @param qinfo qhandle
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
int32_t qKillQuery(qinfo_t qinfo);
|
|
||||||
|
|
||||||
//kill by qid
|
|
||||||
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount);
|
|
||||||
|
|
||||||
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt);
|
|
||||||
|
|
||||||
int32_t qQueryCompleted(qinfo_t qinfo);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* destroy query info structure
|
|
||||||
* @param qHandle
|
|
||||||
*/
|
|
||||||
void qDestroyQueryInfo(qinfo_t qHandle);
|
|
||||||
|
|
||||||
void* qOpenQueryMgmt(int32_t vgId);
|
|
||||||
void qQueryMgmtNotifyClosed(void* pExecutor);
|
|
||||||
void qQueryMgmtReOpen(void *pExecutor);
|
|
||||||
void qCleanupQueryMgmt(void* pExecutor);
|
|
||||||
void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo);
|
|
||||||
void** qAcquireQInfo(void* pMgmt, uint64_t key);
|
|
||||||
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
|
|
||||||
bool checkQIdEqual(void *qHandle, uint64_t qId);
|
|
||||||
int64_t genQueryId(void);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif // TDENGINE_QUERY_H
|
|
|
@ -1,74 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
#ifndef _TD_CQ_H_
|
|
||||||
#define _TD_CQ_H_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#include "tdataformat.h"
|
|
||||||
|
|
||||||
typedef int32_t (*FCqWrite)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg);
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t vgId;
|
|
||||||
char user[TSDB_USER_LEN];
|
|
||||||
char pass[TSDB_PASSWORD_LEN];
|
|
||||||
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; // size must same with SVnodeObj.db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]
|
|
||||||
FCqWrite cqWrite;
|
|
||||||
} SCqCfg;
|
|
||||||
|
|
||||||
// SCqContext
|
|
||||||
typedef struct {
|
|
||||||
int32_t vgId;
|
|
||||||
int32_t master;
|
|
||||||
int32_t num; // number of continuous streams
|
|
||||||
char user[TSDB_USER_LEN];
|
|
||||||
char pass[TSDB_PASSWORD_LEN];
|
|
||||||
char db[TSDB_DB_NAME_LEN];
|
|
||||||
FCqWrite cqWrite;
|
|
||||||
struct SCqObj *pHead;
|
|
||||||
void *dbConn;
|
|
||||||
void *tmrCtrl;
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
int32_t delete;
|
|
||||||
int32_t cqObjNum;
|
|
||||||
} SCqContext;
|
|
||||||
|
|
||||||
// the following API shall be called by vnode
|
|
||||||
void *cqOpen(void *ahandle, const SCqCfg *pCfg);
|
|
||||||
void cqClose(void *handle);
|
|
||||||
|
|
||||||
// if vnode is master, vnode call this API to start CQ
|
|
||||||
void cqStart(void *handle);
|
|
||||||
|
|
||||||
// if vnode is slave/unsynced, vnode shall call this API to stop CQ
|
|
||||||
void cqStop(void *handle);
|
|
||||||
|
|
||||||
// cqCreate is called by TSDB to start an instance of CQ
|
|
||||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start);
|
|
||||||
|
|
||||||
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
|
|
||||||
void cqDrop(void *handle);
|
|
||||||
|
|
||||||
extern int32_t cqDebugFlag;
|
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif // _TD_CQ_H_
|
|
426
src/inc/tsdb.h
426
src/inc/tsdb.h
|
@ -1,426 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
#ifndef _TD_TSDB_H_
|
|
||||||
#define _TD_TSDB_H_
|
|
||||||
|
|
||||||
#include <pthread.h>
|
|
||||||
#include <stdbool.h>
|
|
||||||
#include <stdint.h>
|
|
||||||
|
|
||||||
#include "taosdef.h"
|
|
||||||
#include "tmsg.h"
|
|
||||||
#include "tarray.h"
|
|
||||||
#include "tdataformat.h"
|
|
||||||
#include "tname.h"
|
|
||||||
#include "hash.h"
|
|
||||||
#include "tlockfree.h"
|
|
||||||
#include "tlist.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#define TSDB_VERSION_MAJOR 1
|
|
||||||
#define TSDB_VERSION_MINOR 0
|
|
||||||
|
|
||||||
#define TSDB_INVALID_SUPER_TABLE_ID -1
|
|
||||||
|
|
||||||
#define TSDB_STATUS_COMMIT_START 1
|
|
||||||
#define TSDB_STATUS_COMMIT_OVER 2
|
|
||||||
#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved
|
|
||||||
|
|
||||||
// TSDB STATE DEFINITION
|
|
||||||
#define TSDB_STATE_OK 0x0
|
|
||||||
#define TSDB_STATE_BAD_META 0x1
|
|
||||||
#define TSDB_STATE_BAD_DATA 0x2
|
|
||||||
|
|
||||||
// --------- TSDB APPLICATION HANDLE DEFINITION
|
|
||||||
typedef struct {
|
|
||||||
void *appH;
|
|
||||||
void *cqH;
|
|
||||||
int (*notifyStatus)(void *, int status, int eno);
|
|
||||||
int (*eventCallBack)(void *);
|
|
||||||
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, int start);
|
|
||||||
void (*cqDropFunc)(void *handle);
|
|
||||||
} STsdbAppH;
|
|
||||||
|
|
||||||
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
|
|
||||||
typedef struct {
|
|
||||||
int32_t tsdbId;
|
|
||||||
int32_t cacheBlockSize;
|
|
||||||
int32_t totalBlocks;
|
|
||||||
int32_t daysPerFile; // day per file sharding policy
|
|
||||||
int32_t keep; // day of data to keep
|
|
||||||
int32_t keep1;
|
|
||||||
int32_t keep2;
|
|
||||||
int32_t minRowsPerFileBlock; // minimum rows per file block
|
|
||||||
int32_t maxRowsPerFileBlock; // maximum rows per file block
|
|
||||||
int8_t precision;
|
|
||||||
int8_t compression;
|
|
||||||
int8_t update;
|
|
||||||
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
|
|
||||||
} STsdbCfg;
|
|
||||||
|
|
||||||
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
|
|
||||||
#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0)
|
|
||||||
#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0)
|
|
||||||
|
|
||||||
// --------- TSDB REPOSITORY USAGE STATISTICS
|
|
||||||
typedef struct {
|
|
||||||
int64_t totalStorage; // total bytes occupie
|
|
||||||
int64_t compStorage;
|
|
||||||
int64_t pointsWritten; // total data points written
|
|
||||||
} STsdbStat;
|
|
||||||
|
|
||||||
typedef struct STsdbRepo STsdbRepo;
|
|
||||||
|
|
||||||
STsdbCfg *tsdbGetCfg(const STsdbRepo *repo);
|
|
||||||
|
|
||||||
// --------- TSDB REPOSITORY DEFINITION
|
|
||||||
int32_t tsdbCreateRepo(int repoid);
|
|
||||||
int32_t tsdbDropRepo(int repoid);
|
|
||||||
STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
|
|
||||||
int tsdbCloseRepo(STsdbRepo *repo, int toCommit);
|
|
||||||
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg);
|
|
||||||
int tsdbGetState(STsdbRepo *repo);
|
|
||||||
int8_t tsdbGetCompactState(STsdbRepo *repo);
|
|
||||||
// --------- TSDB TABLE DEFINITION
|
|
||||||
typedef struct {
|
|
||||||
uint64_t uid; // the unique table ID
|
|
||||||
int32_t tid; // the table ID in the repository.
|
|
||||||
} STableId;
|
|
||||||
|
|
||||||
// --------- TSDB TABLE configuration
|
|
||||||
typedef struct {
|
|
||||||
ETableType type;
|
|
||||||
char * name;
|
|
||||||
STableId tableId;
|
|
||||||
int32_t sversion;
|
|
||||||
char * sname; // super table name
|
|
||||||
uint64_t superUid;
|
|
||||||
STSchema * schema;
|
|
||||||
STSchema * tagSchema;
|
|
||||||
SKVRow tagValues;
|
|
||||||
char * sql;
|
|
||||||
} STableCfg;
|
|
||||||
|
|
||||||
void tsdbClearTableCfg(STableCfg *config);
|
|
||||||
|
|
||||||
void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type, int16_t bytes);
|
|
||||||
char *tsdbGetTableName(void *pTable);
|
|
||||||
|
|
||||||
#define TSDB_TABLEID(_table) ((STableId*) (_table))
|
|
||||||
#define TSDB_PREV_ROW 0x1
|
|
||||||
#define TSDB_NEXT_ROW 0x2
|
|
||||||
|
|
||||||
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
|
|
||||||
|
|
||||||
int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg);
|
|
||||||
int tsdbDropTable(STsdbRepo *pRepo, STableId tableId);
|
|
||||||
int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg);
|
|
||||||
|
|
||||||
uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
|
|
||||||
|
|
||||||
// the TSDB repository info
|
|
||||||
typedef struct STsdbRepoInfo {
|
|
||||||
STsdbCfg tsdbCfg;
|
|
||||||
uint64_t version; // version of the repository
|
|
||||||
int64_t tsdbTotalDataSize; // the original inserted data size
|
|
||||||
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
|
|
||||||
// TODO: Other informations to add
|
|
||||||
} STsdbRepoInfo;
|
|
||||||
STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo);
|
|
||||||
|
|
||||||
// the meter information report structure
|
|
||||||
typedef struct {
|
|
||||||
STableCfg tableCfg;
|
|
||||||
uint64_t version;
|
|
||||||
int64_t tableTotalDataSize; // In bytes
|
|
||||||
int64_t tableTotalDiskSize; // In bytes
|
|
||||||
} STableInfo;
|
|
||||||
|
|
||||||
// -- FOR INSERT DATA
|
|
||||||
/**
|
|
||||||
* Insert data to a table in a repository
|
|
||||||
* @param pRepo the TSDB repository handle
|
|
||||||
* @param pData the data to insert (will give a more specific description)
|
|
||||||
*
|
|
||||||
* @return the number of points inserted, -1 for failure and the error number is set
|
|
||||||
*/
|
|
||||||
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
|
|
||||||
|
|
||||||
// -- FOR QUERY TIME SERIES DATA
|
|
||||||
|
|
||||||
typedef void *TsdbQueryHandleT; // Use void to hide implementation details
|
|
||||||
|
|
||||||
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
|
|
||||||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
|
||||||
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
|
||||||
|
|
||||||
// query condition to build multi-table data block iterator
|
|
||||||
typedef struct STsdbQueryCond {
|
|
||||||
STimeWindow twindow;
|
|
||||||
int32_t order; // desc|asc order to iterate the data block
|
|
||||||
int32_t numOfCols;
|
|
||||||
SColumnInfo *colList;
|
|
||||||
bool loadExternalRows; // load external rows or not
|
|
||||||
int32_t type; // data block load type:
|
|
||||||
} STsdbQueryCond;
|
|
||||||
|
|
||||||
typedef struct STableData STableData;
|
|
||||||
typedef struct {
|
|
||||||
T_REF_DECLARE()
|
|
||||||
SRWLatch latch;
|
|
||||||
TSKEY keyFirst;
|
|
||||||
TSKEY keyLast;
|
|
||||||
int64_t numOfRows;
|
|
||||||
int32_t maxTables;
|
|
||||||
STableData **tData;
|
|
||||||
SList * actList;
|
|
||||||
SList * extraBuffList;
|
|
||||||
SList * bufBlockList;
|
|
||||||
int64_t pointsAdd; // TODO
|
|
||||||
int64_t storageAdd; // TODO
|
|
||||||
} SMemTable;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMemTable* mem;
|
|
||||||
SMemTable* imem;
|
|
||||||
SMemTable mtable;
|
|
||||||
SMemTable* omem;
|
|
||||||
} SMemSnapshot;
|
|
||||||
|
|
||||||
typedef struct SMemRef {
|
|
||||||
int32_t ref;
|
|
||||||
SMemSnapshot snapshot;
|
|
||||||
} SMemRef;
|
|
||||||
|
|
||||||
typedef struct SDataBlockInfo {
|
|
||||||
STimeWindow window;
|
|
||||||
int32_t rows;
|
|
||||||
int32_t numOfCols;
|
|
||||||
int64_t uid;
|
|
||||||
int32_t tid;
|
|
||||||
} SDataBlockInfo;
|
|
||||||
|
|
||||||
typedef struct SFileBlockInfo {
|
|
||||||
int32_t numBlocksOfStep;
|
|
||||||
} SFileBlockInfo;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
void *pTable;
|
|
||||||
TSKEY lastKey;
|
|
||||||
} STableKeyInfo;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
uint32_t numOfTables;
|
|
||||||
SArray *pGroupList;
|
|
||||||
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
|
|
||||||
} STableGroupInfo;
|
|
||||||
|
|
||||||
#define TSDB_BLOCK_DIST_STEP_ROWS 16
|
|
||||||
typedef struct {
|
|
||||||
uint16_t rowSize;
|
|
||||||
uint16_t numOfFiles;
|
|
||||||
uint32_t numOfTables;
|
|
||||||
uint64_t totalSize;
|
|
||||||
uint64_t totalRows;
|
|
||||||
int32_t maxRows;
|
|
||||||
int32_t minRows;
|
|
||||||
int32_t firstSeekTimeUs;
|
|
||||||
uint32_t numOfRowsInMemTable;
|
|
||||||
uint32_t numOfSmallBlocks;
|
|
||||||
SArray *dataBlockInfos;
|
|
||||||
} STableBlockDist;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the data block iterator, starting from position according to the query condition
|
|
||||||
*
|
|
||||||
* @param tsdb tsdb handle
|
|
||||||
* @param pCond query condition, including time window, result set order, and basic required columns for each block
|
|
||||||
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
|
|
||||||
* group by condition
|
|
||||||
* @param qinfo query info handle from query processor
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
|
|
||||||
SMemRef *pRef);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
|
|
||||||
* Note that only one data block with only row will be returned while invoking retrieve data block function for
|
|
||||||
* all tables in this group.
|
|
||||||
*
|
|
||||||
* @param tsdb tsdb handle
|
|
||||||
* @param pCond query condition, including time window, result set order, and basic required columns for each block
|
|
||||||
* @param tableInfo table list.
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
|
|
||||||
SMemRef *pRef);
|
|
||||||
|
|
||||||
|
|
||||||
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef);
|
|
||||||
|
|
||||||
bool isTsdbCacheLastRow(TsdbQueryHandleT* pTsdbReadHandle);
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* get the queried table object list
|
|
||||||
* @param pHandle
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* get the group list according to table id from client
|
|
||||||
* @param tsdb
|
|
||||||
* @param pCond
|
|
||||||
* @param groupList
|
|
||||||
* @param qinfo
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
|
|
||||||
uint64_t qId, SMemRef *pRef);
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* get num of rows in mem table
|
|
||||||
*
|
|
||||||
* @param pHandle
|
|
||||||
* @return row size
|
|
||||||
*/
|
|
||||||
|
|
||||||
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* move to next block if exists
|
|
||||||
*
|
|
||||||
* @param pTsdbReadHandle
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
bool tsdbNextDataBlock(TsdbQueryHandleT pTsdbReadHandle);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get current data block information
|
|
||||||
*
|
|
||||||
* @param pTsdbReadHandle
|
|
||||||
* @param pBlockInfo
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Get the pre-calculated information w.r.t. current data block.
|
|
||||||
*
|
|
||||||
* In case of data block in cache, the pBlockStatis will always be NULL.
|
|
||||||
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
|
|
||||||
|
|
||||||
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataStatis **pBlockStatis);
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* The query condition with primary timestamp is passed to iterator during its constructor function,
|
|
||||||
* the returned data block must be satisfied with the time window condition in any cases,
|
|
||||||
* which means the SData data block is not actually the completed disk data blocks.
|
|
||||||
*
|
|
||||||
* @param pTsdbReadHandle query handle
|
|
||||||
* @param pColumnIdList required data columns id list
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pTsdbReadHandle, SArray *pColumnIdList);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the qualified table id for a super table according to the tag query expression.
|
|
||||||
* @param stableid. super table sid
|
|
||||||
* @param pTagCond. tag query condition
|
|
||||||
*/
|
|
||||||
int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len,
|
|
||||||
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList,
|
|
||||||
SColIndex *pColIndex, int32_t numOfCols);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* destroy the created table group list, which is generated by tag query
|
|
||||||
* @param pGroupList
|
|
||||||
*/
|
|
||||||
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* create the table group result including only one table, used to handle the normal table query
|
|
||||||
*
|
|
||||||
* @param tsdb tsdbHandle
|
|
||||||
* @param uid table uid
|
|
||||||
* @param pGroupInfo the generated result
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
int32_t tsdbGetOneTableGroup(STsdbRepo *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param tsdb
|
|
||||||
* @param pTableIdList
|
|
||||||
* @param pGroupInfo
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* clean up the query handle
|
|
||||||
* @param queryHandle
|
|
||||||
*/
|
|
||||||
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
|
|
||||||
|
|
||||||
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
|
|
||||||
|
|
||||||
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList);
|
|
||||||
|
|
||||||
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* get the statistics of repo usage
|
|
||||||
* @param repo. point to the tsdbrepo
|
|
||||||
* @param totalPoints. total data point written
|
|
||||||
* @param totalStorage. total bytes took by the tsdb
|
|
||||||
* @param compStorage. total bytes took by the tsdb after compressed
|
|
||||||
*/
|
|
||||||
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);
|
|
||||||
|
|
||||||
int tsdbInitCommitQueue();
|
|
||||||
void tsdbDestroyCommitQueue();
|
|
||||||
int tsdbSyncCommit(STsdbRepo *repo);
|
|
||||||
void tsdbIncCommitRef(int vgId);
|
|
||||||
void tsdbDecCommitRef(int vgId);
|
|
||||||
|
|
||||||
// For TSDB file sync
|
|
||||||
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
|
|
||||||
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
|
|
||||||
|
|
||||||
// For TSDB Compact
|
|
||||||
int tsdbCompact(STsdbRepo *pRepo);
|
|
||||||
|
|
||||||
// For TSDB Health Monitor
|
|
||||||
|
|
||||||
// no problem return true
|
|
||||||
bool tsdbNoProblem(STsdbRepo* pRepo);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif // _TD_TSDB_H_
|
|
Loading…
Reference in New Issue