Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

This commit is contained in:
Minghao Li 2022-03-14 20:44:09 +08:00
commit e3ef9d49e4
37 changed files with 1356 additions and 305 deletions

View File

@ -61,6 +61,12 @@ typedef enum {
TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired
} ETsdbSmaStat; } ETsdbSmaStat;
typedef enum {
TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA
TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA
TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA
} ETsdbSmaType;
extern char *qtypeStr[]; extern char *qtypeStr[];
#define TSDB_PORT_HTTP 11 #define TSDB_PORT_HTTP 11

View File

@ -101,6 +101,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_PROJECT, QUERY_NODE_LOGIC_PLAN_PROJECT,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF, QUERY_NODE_LOGIC_PLAN_VNODE_MODIF,
QUERY_NODE_LOGIC_PLAN_EXCHANGE, QUERY_NODE_LOGIC_PLAN_EXCHANGE,
QUERY_NODE_LOGIC_PLAN_WINDOW,
QUERY_NODE_LOGIC_SUBPLAN, QUERY_NODE_LOGIC_SUBPLAN,
QUERY_NODE_LOGIC_PLAN, QUERY_NODE_LOGIC_PLAN,
@ -115,6 +116,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_AGG, QUERY_NODE_PHYSICAL_PLAN_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_SORT, QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_SUBPLAN, QUERY_NODE_PHYSICAL_SUBPLAN,

View File

@ -80,6 +80,22 @@ typedef struct SExchangeLogicNode {
int32_t srcGroupId; int32_t srcGroupId;
} SExchangeLogicNode; } SExchangeLogicNode;
typedef enum EWindowType {
WINDOW_TYPE_INTERVAL = 1,
WINDOW_TYPE_SESSION,
WINDOW_TYPE_STATE
} EWindowType;
typedef struct SWindowLogicNode {
SLogicNode node;
EWindowType winType;
SNodeList* pFuncs;
int64_t interval;
int64_t offset;
int64_t sliding;
SFillNode* pFill;
} SWindowLogicNode;
typedef enum ESubplanType { typedef enum ESubplanType {
SUBPLAN_TYPE_MERGE = 1, SUBPLAN_TYPE_MERGE = 1,
SUBPLAN_TYPE_PARTIAL, SUBPLAN_TYPE_PARTIAL,
@ -191,6 +207,16 @@ typedef struct SExchangePhysiNode {
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode; } SExchangePhysiNode;
typedef struct SIntervalPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of parameter expression of function
SNodeList* pFuncs;
int64_t interval;
int64_t offset;
int64_t sliding;
SFillNode* pFill;
} SIntervalPhysiNode;
typedef struct SDataSinkNode { typedef struct SDataSinkNode {
ENodeType type; ENodeType type;
SDataBlockDescNode* pInputDataBlockDesc; SDataBlockDescNode* pInputDataBlockDesc;

View File

@ -71,6 +71,10 @@ typedef struct SRpcInit {
// call back to keep conn or not // call back to keep conn or not
bool (*pfp)(void *parent, tmsg_t msgType); bool (*pfp)(void *parent, tmsg_t msgType);
// to support Send messages multiple times on a link
//
void* (*mfp)(void *parent, tmsg_t msgType);
void *parent; void *parent;
} SRpcInit; } SRpcInit;
@ -89,6 +93,9 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp)
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
// just release client conn to rpc instance, no close sock
void rpcReleaseHandle(void *handle);
void rpcRefHandle(void *handle, int8_t type); void rpcRefHandle(void *handle, int8_t type);
void rpcUnrefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type);

View File

@ -17,12 +17,16 @@
#define _TD_OS_LOCALE_H_ #define _TD_OS_LOCALE_H_
#include "os.h" #include "os.h"
#include "osString.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define setlocale SETLOCALE_FUNC_TAOS_FORBID
#endif
char *taosCharsetReplace(char *charsetstr); char *taosCharsetReplace(char *charsetstr);
void taosGetSystemLocale(char *outLocale, char *outCharset); void taosGetSystemLocale(char *outLocale, char *outCharset);
void taosSetSystemLocale(const char *inLocale, const char *inCharSet); void taosSetSystemLocale(const char *inLocale, const char *inCharSet);

View File

@ -353,6 +353,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) #define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614)
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) #define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) #define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0617)
// query // query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)

View File

@ -195,11 +195,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) { int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
pRequest->type = pQuery->msgType; pRequest->type = pQuery->msgType;
SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot, .acctId = pRequest->pTscObj->acctId }; SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot, .acctId = pRequest->pTscObj->acctId };
int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList); return qCreateQueryPlan(&cxt, pPlan, pNodeList);
if (code != 0) {
return code;
}
return code;
} }
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) { void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {

View File

@ -95,6 +95,7 @@ int tsdbCommit(STsdb *pTsdb);
* @return int32_t * @return int32_t
*/ */
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg); int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg);
/** /**
* @brief Insert RSma(Time-range-wise Rollup SMA) data. * @brief Insert RSma(Time-range-wise Rollup SMA) data.
@ -105,6 +106,12 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
*/ */
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey,
int32_t nMaxResult);
// STsdbCfg // STsdbCfg
int tsdbOptionsInit(STsdbCfg *); int tsdbOptionsInit(STsdbCfg *);
void tsdbOptionsClear(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *);

View File

@ -0,0 +1,44 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_DB_DEF_H_
#define _TD_TSDB_DB_DEF_H_
#include "db.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SDBFile SDBFile;
typedef DB_ENV* TDBEnv;
struct SDBFile {
DB* pDB;
char* path;
};
int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF);
void tsdbCloseDBF(SDBFile* pDBF);
int32_t tsdbOpenBDBEnv(DB_ENV** ppEnv, const char* path);
void tsdbCloseBDBEnv(DB_ENV* pEnv);
int32_t tsdbSaveSmaToDB(SDBFile* pDBF, void* key, uint32_t keySize, void* data, uint32_t dataSize);
void* tsdbGetSmaDataByKey(SDBFile* pDBF, void* key, uint32_t keySize, uint32_t* valueSize);
#ifdef __cplusplus
}
#endif
#endif /*_TD_TSDB_DB_DEF_H_*/

View File

@ -27,6 +27,7 @@
#include "ttime.h" #include "ttime.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbDBDef.h"
#include "tsdbCommit.h" #include "tsdbCommit.h"
#include "tsdbFS.h" #include "tsdbFS.h"
#include "tsdbFile.h" #include "tsdbFile.h"
@ -37,12 +38,15 @@
#include "tsdbReadImpl.h" #include "tsdbReadImpl.h"
#include "tsdbSma.h" #include "tsdbSma.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
struct STsdb { struct STsdb {
int32_t vgId; int32_t vgId;
bool repoLocked;
pthread_mutex_t mutex;
char * path; char * path;
STsdbCfg config; STsdbCfg config;
STsdbMemTable * mem; STsdbMemTable * mem;
@ -52,12 +56,17 @@ struct STsdb {
STsdbFS * fs; STsdbFS * fs;
SMeta * pMeta; SMeta * pMeta;
STfs * pTfs; STfs * pTfs;
SSmaStat * pSmaStat; SSmaEnv * pTSmaEnv;
SSmaEnv * pRSmaEnv;
}; };
#define REPO_ID(r) ((r)->vgId) #define REPO_ID(r) ((r)->vgId)
#define REPO_CFG(r) (&(r)->config) #define REPO_CFG(r) (&(r)->config)
#define REPO_FS(r) (r)->fs #define REPO_FS(r) (r)->fs
#define IS_REPO_LOCKED(r) (r)->repoLocked
int tsdbLockRepo(STsdb *pTsdb);
int tsdbUnlockRepo(STsdb *pTsdb);
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) { static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) {
return pTable->pSchema; return pTable->pSchema;

View File

@ -329,21 +329,23 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
// =============== SDFileSet // =============== SDFileSet
typedef struct { typedef struct {
int fid; int fid;
int8_t state; // -128~127 int8_t state; // -128~127
uint8_t ver; // 0~255, DFileSet version uint8_t ver; // 0~255, DFileSet version
uint16_t reserve; uint16_t reserve;
SDFile files[TSDB_FILE_MAX]; SDFile files[TSDB_FILE_MAX];
} SDFileSet; } SDFileSet;
typedef struct { typedef struct {
int fid; int fid;
int8_t state; int8_t state;
uint8_t ver; uint8_t ver;
uint16_t reserve;
#if 0 #if 0
SDFInfo info; SDFInfo info;
#endif #endif
STfsFile f; STfsFile f;
TdFilePtr pFile; TdFilePtr pFile;
} SSFile; // files split by days with fid } SSFile; // files split by days with fid
#define TSDB_LATEST_FSET_VER 0 #define TSDB_LATEST_FSET_VER 0

View File

@ -17,27 +17,29 @@
#define _TD_TSDB_SMA_H_ #define _TD_TSDB_SMA_H_
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
typedef struct SSmaEnv SSmaEnv;
// insert/update interface struct SSmaEnv {
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); pthread_rwlock_t lock;
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); TDBEnv dbEnv;
char * path;
SSmaStat * pStat;
};
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_ENV(env) ((env)->dbEnv)
#define SMA_ENV_PATH(env) ((env)->path)
#define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
// query interface void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv);
// TODO: This is the basic params, and should wrap the params to a queryHandle. void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv);
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult);
// management interface
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg);
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
#if 0 #if 0
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
#endif #endif
// internal func // internal func
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) {
int32_t len = 0; int32_t len = 0;
len += taosEncodeFixedI64(pData, tableUid); len += taosEncodeFixedI64(pData, tableUid);
@ -46,4 +48,31 @@ static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId,
return len; return len;
} }
static FORCE_INLINE int tsdbRLockSma(SSmaEnv *pEnv) {
int code = pthread_rwlock_rdlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int tsdbWLockSma(SSmaEnv *pEnv) {
int code = pthread_rwlock_wrlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int tsdbUnLockSma(SSmaEnv *pEnv) {
int code = pthread_rwlock_unlock(&(pEnv->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
#endif /* _TD_TSDB_SMA_H_ */ #endif /* _TD_TSDB_SMA_H_ */

View File

@ -231,30 +231,31 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
void *pBuf = NULL, *qBuf = NULL; void *pBuf = NULL, *qBuf = NULL;
DBT key1 = {0}, value1 = {0}; DBT key1 = {0}, value1 = {0};
{ // save sma info
// save sma info int32_t len = tEncodeTSma(NULL, pSmaCfg);
int32_t len = tEncodeTSma(NULL, pSmaCfg); pBuf = calloc(len, 1);
pBuf = calloc(len, 1); if (pBuf == NULL) {
if (pBuf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY; return -1;
return -1;
}
key1.data = (void *)&pSmaCfg->indexUid;
key1.size = sizeof(pSmaCfg->indexUid);
qBuf = pBuf;
tEncodeTSma(&qBuf, pSmaCfg);
value1.data = pBuf;
value1.size = POINTER_DISTANCE(qBuf, pBuf);
value1.app_data = pSmaCfg;
} }
key1.data = (void *)&pSmaCfg->indexUid;
key1.size = sizeof(pSmaCfg->indexUid);
qBuf = pBuf;
tEncodeTSma(&qBuf, pSmaCfg);
value1.data = pBuf;
value1.size = POINTER_DISTANCE(qBuf, pBuf);
value1.app_data = pSmaCfg;
metaDBWLock(pMeta->pDB); metaDBWLock(pMeta->pDB);
pMeta->pDB->pSmaDB->put(pMeta->pDB->pSmaDB, NULL, &key1, &value1, 0); pMeta->pDB->pSmaDB->put(pMeta->pDB->pSmaDB, NULL, &key1, &value1, 0);
metaDBULock(pMeta->pDB); metaDBULock(pMeta->pDB);
// release
tfree(pBuf);
return 0; return 0;
} }

View File

@ -12,3 +12,162 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define ALLOW_FORBID_FUNC
#include "db.h"
#include "taoserror.h"
#include "tcoding.h"
#include "thash.h"
#include "tsdbDBDef.h"
#include "tsdbLog.h"
#define IMPL_WITH_LOCK 1
static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup);
static void tsdbCloseBDBDb(DB *pDB);
#define BDB_PERR(info, code) fprintf(stderr, "%s:%d " info " reason: %s\n", __FILE__, __LINE__, db_strerror(code))
int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) {
// TDBEnv is shared by a group of SDBFile
if (!pEnv) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
// Open DBF
if (tsdbOpenBDBDb(&(pDBF->pDB), pEnv, pDBF->path, false) < 0) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
tsdbCloseBDBDb(pDBF->pDB);
return -1;
}
return 0;
}
void tsdbCloseDBF(SDBFile *pDBF) {
if (pDBF->pDB) {
tsdbCloseBDBDb(pDBF->pDB);
pDBF->pDB = NULL;
}
tfree(pDBF->path);
}
int32_t tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
int ret = 0;
DB_ENV *pEnv = NULL;
if (path == NULL) return 0;
ret = db_env_create(&pEnv, 0);
if (ret != 0) {
BDB_PERR("Failed to create tsdb env", ret);
return -1;
}
ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0);
if (ret != 0) {
// BDB_PERR("Failed to open tsdb env", ret);
tsdbWarn("Failed to open tsdb env for path %s since %d", path ? path : "NULL", ret);
return -1;
}
*ppEnv = pEnv;
return 0;
}
void tsdbCloseBDBEnv(DB_ENV *pEnv) {
if (pEnv) {
pEnv->close(pEnv, 0);
}
}
static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) {
int ret;
DB *pDB;
ret = db_create(&(pDB), pEnv, 0);
if (ret != 0) {
BDB_PERR("Failed to create DBP", ret);
return -1;
}
if (isDup) {
ret = pDB->set_flags(pDB, DB_DUPSORT);
if (ret != 0) {
BDB_PERR("Failed to set DB flags", ret);
return -1;
}
}
ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0);
if (ret) {
BDB_PERR("Failed to open DBF", ret);
return -1;
}
*ppDB = pDB;
return 0;
}
static void tsdbCloseBDBDb(DB *pDB) {
if (pDB) {
pDB->close(pDB, 0);
}
}
int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *key, uint32_t keySize, void *data, uint32_t dataSize) {
int ret;
DBT key1 = {0}, value1 = {0};
key1.data = key;
key1.size = keySize;
value1.data = data;
value1.size = dataSize;
// TODO: lock
ret = pDBF->pDB->put(pDBF->pDB, NULL, &key1, &value1, 0);
if (ret) {
BDB_PERR("Failed to put data to DBF", ret);
// TODO: unlock
return -1;
}
// TODO: unlock
return 0;
}
void *tsdbGetSmaDataByKey(SDBFile *pDBF, void* key, uint32_t keySize, uint32_t *valueSize) {
void *result = NULL;
DBT key1 = {0};
DBT value1 = {0};
int ret;
// Set key/value
key1.data = key;
key1.size = keySize;
// Query
// TODO: lock
ret = pDBF->pDB->get(pDBF->pDB, NULL, &key1, &value1, 0);
// TODO: unlock
if (ret != 0) {
return NULL;
}
result = calloc(1, value1.size);
if (result == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
*valueSize = value1.size;
memcpy(result, value1.data, value1.size);
return result;
}

View File

@ -80,6 +80,8 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
pTsdb->pmaf = pMAF; pTsdb->pmaf = pMAF;
pTsdb->pMeta = pMeta; pTsdb->pMeta = pMeta;
pTsdb->pTfs = pTfs; pTsdb->pTfs = pTfs;
pTsdb->pTSmaEnv = NULL;
pTsdb->pRSmaEnv = NULL;
pTsdb->fs = tsdbNewFS(pTsdbCfg); pTsdb->fs = tsdbNewFS(pTsdbCfg);
@ -88,8 +90,9 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
static void tsdbFree(STsdb *pTsdb) { static void tsdbFree(STsdb *pTsdb) {
if (pTsdb) { if (pTsdb) {
tsdbFreeSmaEnv(pTsdb->pRSmaEnv);
tsdbFreeSmaEnv(pTsdb->pTSmaEnv);
tsdbFreeFS(pTsdb->fs); tsdbFreeFS(pTsdb->fs);
tsdbDestroySmaState(pTsdb->pSmaStat);
tfree(pTsdb->path); tfree(pTsdb->path);
free(pTsdb); free(pTsdb);
} }
@ -105,6 +108,30 @@ static void tsdbCloseImpl(STsdb *pTsdb) {
tsdbCloseFS(pTsdb); tsdbCloseFS(pTsdb);
// TODO // TODO
} }
int tsdbLockRepo(STsdb *pTsdb) {
int code = pthread_mutex_lock(&pTsdb->mutex);
if (code != 0) {
tsdbError("vgId:%d failed to lock tsdb since %s", REPO_ID(pTsdb), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
pTsdb->repoLocked = true;
return 0;
}
int tsdbUnlockRepo(STsdb *pTsdb) {
ASSERT(IS_REPO_LOCKED(pTsdb));
pTsdb->repoLocked = false;
int code = pthread_mutex_unlock(&pTsdb->mutex);
if (code != 0) {
tsdbError("vgId:%d failed to unlock tsdb since %s", REPO_ID(pTsdb), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
#if 0 #if 0
/* /*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>

View File

@ -15,7 +15,9 @@
#include "tsdbDef.h" #include "tsdbDef.h"
#undef SMA_PRINT_DEBUG_LOG
#define SMA_STORAGE_TSDB_DAYS 30 #define SMA_STORAGE_TSDB_DAYS 30
#define SMA_STORAGE_TSDB_TIMES 30
#define SMA_STORAGE_SPLIT_HOURS 24 #define SMA_STORAGE_SPLIT_HOURS 24
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8 #define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
@ -23,7 +25,7 @@
#define SMA_STATE_ITEM_HASH_SLOT 32 #define SMA_STATE_ITEM_HASH_SLOT 32
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test #define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
#define SMA_TEST_INDEX_UID 123456 // TODO: just for test #define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test
typedef enum { typedef enum {
SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat
SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat
@ -31,23 +33,22 @@ typedef enum {
typedef struct { typedef struct {
STsdb * pTsdb; STsdb * pTsdb;
char * pDFile; // TODO: use the real DFile type, not char* SDBFile dFile;
int32_t interval; // interval with the precision of DB int32_t interval; // interval with the precision of DB
// TODO
} STSmaWriteH; } STSmaWriteH;
typedef struct { typedef struct {
int32_t iter; int32_t iter;
int32_t fid;
} SmaFsIter; } SmaFsIter;
typedef struct { typedef struct {
STsdb * pTsdb; STsdb * pTsdb;
char * pDFile; // TODO: use the real DFile type, not char* SDBFile dFile;
int32_t interval; // interval with the precision of DB int32_t interval; // interval with the precision of DB
int32_t blockSize; // size of SMA block item int32_t blockSize; // size of SMA block item
int8_t storageLevel; int8_t storageLevel;
int8_t days; int8_t days;
SmaFsIter smaFsIter; SmaFsIter smaFsIter;
// TODO
} STSmaReadH; } STSmaReadH;
typedef struct { typedef struct {
@ -68,18 +69,117 @@ struct SSmaStat {
}; };
// declaration of static functions // declaration of static functions
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit); // TODO: This is the basic params, and should wrap the params to a queryHandle.
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen); int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey,
int32_t nMaxResult);
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg);
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path);
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv);
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit);
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData);
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen);
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel);
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid); static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid);
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey);
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin); SSmaEnv *pEnv = NULL;
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin);
pEnv = (SSmaEnv *)calloc(1, sizeof(SSmaEnv));
if (pEnv == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int code = pthread_rwlock_init(&(pEnv->lock), NULL);
if (code) {
terrno = TAOS_SYSTEM_ERROR(code);
free(pEnv);
return NULL;
}
ASSERT(path && (strlen(path) > 0));
pEnv->path = strdup(path);
if (pEnv->path == NULL) {
tsdbFreeSmaEnv(pEnv);
return NULL;
}
if (tsdbInitSmaStat(&pEnv->pStat) != TSDB_CODE_SUCCESS) {
tsdbFreeSmaEnv(pEnv);
return NULL;
}
if (tsdbOpenBDBEnv(&pEnv->dbEnv, pEnv->path) != TSDB_CODE_SUCCESS) {
tsdbFreeSmaEnv(pEnv);
return NULL;
}
return pEnv;
}
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
if (!pEnv) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED;
}
if (pEnv && *pEnv) {
return TSDB_CODE_SUCCESS;
}
if (tsdbLockRepo(pTsdb) != 0) {
return TSDB_CODE_FAILED;
}
if (*pEnv == NULL) {
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) {
tsdbUnlockRepo(pTsdb);
return TSDB_CODE_FAILED;
}
}
if (tsdbUnlockRepo(pTsdb) != 0) {
tsdbFreeSmaEnv(*pEnv);
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief Release resources allocated for its member fields, not including itself.
*
* @param pSmaEnv
* @return int32_t
*/
void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) {
if (pSmaEnv) {
tsdbDestroySmaState(pSmaEnv->pStat);
tfree(pSmaEnv->pStat);
tfree(pSmaEnv->path);
pthread_rwlock_destroy(&(pSmaEnv->lock));
tsdbCloseBDBEnv(pSmaEnv->dbEnv);
}
}
void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) {
tsdbDestroySmaEnv(pSmaEnv);
tfree(pSmaEnv);
return NULL;
}
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
ASSERT(pSmaStat != NULL); ASSERT(pSmaStat != NULL);
@ -125,6 +225,12 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
return pItem; return pItem;
} }
/**
* @brief Release resources allocated for its member fields, not including itself.
*
* @param pSmaStat
* @return int32_t
*/
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
if (pSmaStat) { if (pSmaStat) {
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
@ -135,7 +241,6 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
item = taosHashIterate(pSmaStat->smaStatItems, item); item = taosHashIterate(pSmaStat->smaStatItems, item);
} }
taosHashCleanup(pSmaStat->smaStatItems); taosHashCleanup(pSmaStat->smaStatItems);
free(pSmaStat);
} }
} }
@ -143,22 +248,35 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
* @brief Update expired window according to msg from stream computing module. * @brief Update expired window according to msg from stream computing module.
* *
* @param pTsdb * @param pTsdb
* @param smaType ETsdbSmaType
* @param msg * @param msg
* @return int32_t * @return int32_t
*/ */
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
if (msg == NULL) { STsdbCfg *pCfg = REPO_CFG(pTsdb);
SSmaEnv * pEnv = NULL;
if (!msg || !pTsdb->pMeta) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
// lazy mode char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/";
if (tsdbInitSmaStat(&pTsdb->pSmaStat) != TSDB_CODE_SUCCESS) { if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
pTsdb->pTSmaEnv = pEnv;
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
pTsdb->pRSmaEnv = pEnv;
} else {
ASSERT(0);
}
// TODO: decode the msg => start // TODO: decode the msg => start
int64_t indexUid = SMA_TEST_INDEX_UID; int64_t indexUid = SMA_TEST_INDEX_UID;
const char * indexName = SMA_TEST_INDEX_NAME; // const char * indexName = SMA_TEST_INDEX_NAME;
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
@ -167,9 +285,9 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
} }
// TODO: decode the msg <= end // TODO: decode the msg <= end
SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems; SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName)); SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
if (pItem == NULL) { if (pItem == NULL) {
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state
if (pItem == NULL) { if (pItem == NULL) {
@ -181,20 +299,28 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
// cache smaMeta // cache smaMeta
STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid); STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid);
if (pSma == NULL) { if (pSma == NULL) {
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
taosHashCleanup(pItem->expiredWindows); taosHashCleanup(pItem->expiredWindows);
free(pItem); free(pItem);
tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid,
tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pItem->pSma = pSma; pItem->pSma = pSma;
// TODO: change indexName to indexUid // TODO: change indexName to indexUid
if (taosHashPut(pItemsHash, indexName, strnlen(indexName, TSDB_INDEX_NAME_LEN), &pItem, sizeof(pItem)) != 0) { if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) {
// If error occurs during put smaStatItem, free the resources of pItem // If error occurs during put smaStatItem, free the resources of pItem
taosHashCleanup(pItem->expiredWindows); taosHashCleanup(pItem->expiredWindows);
free(pItem); free(pItem);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} }
#if 0
SSmaStatItem *pItem1 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
int size1 = taosHashGetSize(pItem1->expiredWindows);
tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d before hashPut", REPO_ID(pTsdb), indexUid, size1);
#endif
int8_t state = TSDB_SMA_STAT_EXPIRED; int8_t state = TSDB_SMA_STAT_EXPIRED;
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
@ -207,21 +333,28 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
// windows failed to put into hash table. // windows failed to put into hash table.
taosHashCleanup(pItem->expiredWindows); taosHashCleanup(pItem->expiredWindows);
tfree(pItem->pSma); tfree(pItem->pSma);
taosHashRemove(pItemsHash, indexName, sizeof(indexName)); taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} }
#if 0
SSmaStatItem *pItem2 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
int size2 = taosHashGetSize(pItem1->expiredWindows);
tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d after hashPut", REPO_ID(pTsdb), indexUid, size2);
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey) { static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
SSmaStatItem *pItem = NULL; SSmaStatItem *pItem = NULL;
if (pTsdb->pSmaStat && pTsdb->pSmaStat->smaStatItems) { // TODO: If HASH_ENTRY_LOCK used, whether rwlock needed to handle cases of removing hashNode?
pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &indexUid, sizeof(indexUid)); if (pStat && pStat->smaStatItems) {
pItem = (SSmaStatItem *)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
} }
#if 0
if (pItem != NULL) { if (pItem != NULL) {
// TODO: reset time window for the sma data blocks // TODO: reset time window for the sma data blocks
if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) { if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
@ -231,6 +364,7 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey
} else { } else {
// error handling // error handling
} }
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -241,7 +375,7 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey
* @param intervalUnit * @param intervalUnit
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) { static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
// TODO: configurable for SMA_STORAGE_SPLIT_HOURS? // TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
switch (intervalUnit) { switch (intervalUnit) {
case TD_TIME_UNIT_HOUR: case TD_TIME_UNIT_HOUR:
@ -281,18 +415,35 @@ static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) {
} }
/** /**
* @brief Insert TSma data blocks to B+Tree * @brief Insert TSma data blocks to DB File build by B+Tree
* *
* @param bTree * @param pSmaH
* @param smaKey * @param smaKey
* @param keyLen
* @param pData * @param pData
* @param dataLen * @param dataLen
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen) { static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) {
SDBFile *pDBFile = &pSmaH->dFile;
// TODO: insert sma data blocks into B+Tree // TODO: insert sma data blocks into B+Tree
tsdbDebug("insert sma data blocks into B+Tree: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d",
*(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); REPO_ID(pSmaH->pTsdb), pDBFile->path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
*(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen);
if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) {
return TSDB_CODE_FAILED;
}
#ifdef SMA_PRINT_DEBUG_LOG
uint32_t valueSize = 0;
void * data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
ASSERT(data != NULL);
for (uint32_t v = 0; v < valueSize; v += 8) {
tsdbWarn("vgId:%d sma data - val[%d] is %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
}
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -324,41 +475,41 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit
} }
} }
switch (intervalUnit) { switch (precision) {
case TD_TIME_UNIT_MILLISEC: case TSDB_TIME_PRECISION_MILLI:
if (TSDB_TIME_PRECISION_MILLI == precision) { if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us
return interval;
} else if (TSDB_TIME_PRECISION_MICRO == precision) {
return interval * 1e3;
} else { // nano second
return interval * 1e6;
}
break;
case TD_TIME_UNIT_MICROSEC:
if (TSDB_TIME_PRECISION_MILLI == precision) {
return interval / 1e3; return interval / 1e3;
} else if (TSDB_TIME_PRECISION_MICRO == precision) { } else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
return interval;
} else { // nano second
return interval * 1e3;
}
break;
case TD_TIME_UNIT_NANOSEC:
if (TSDB_TIME_PRECISION_MILLI == precision) {
return interval / 1e6; return interval / 1e6;
} else if (TSDB_TIME_PRECISION_MICRO == precision) { } else {
return interval / 1e3;
} else { // nano second
return interval; return interval;
} }
break; break;
default: case TSDB_TIME_PRECISION_MICRO:
if (TSDB_TIME_PRECISION_MILLI == precision) { if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us
return interval;
} else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
return interval / 1e3;
} else {
return interval * 1e3; return interval * 1e3;
} else if (TSDB_TIME_PRECISION_MICRO == precision) { }
break;
case TSDB_TIME_PRECISION_NANO:
if (TD_TIME_UNIT_MICROSEC == intervalUnit) {
return interval * 1e3;
} else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
return interval;
} else {
return interval * 1e6; return interval * 1e6;
} else { // nano second }
return interval * 1e9; break;
default: // ms
if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us
return interval / 1e3;
} else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
return interval / 1e6;
} else {
return interval;
} }
break; break;
} }
@ -381,8 +532,6 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
// TODO: check the data integrity // TODO: check the data integrity
void *bTree = pSmaH->pDFile;
int32_t len = 0; int32_t len = 0;
while (true) { while (true) {
if (len >= pData->dataLen) { if (len >= pData->dataLen) {
@ -405,7 +554,7 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId); pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId);
#endif #endif
tsdbEncodeTSmaKey(pTbData->tableUid, pColData->colId, pData->skey, (void **)&pSmaKey); tsdbEncodeTSmaKey(pTbData->tableUid, pColData->colId, pData->skey, (void **)&pSmaKey);
if (tsdbInsertTSmaBlocks(bTree, smaKey, pColData->data, pColData->blockSize) < 0) { if (tsdbInsertTSmaBlocks(pSmaH, smaKey, SMA_KEY_LEN, pColData->data, pColData->blockSize) < 0) {
tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
} }
tbLen += (sizeof(STSmaColData) + pColData->blockSize); tbLen += (sizeof(STSmaColData) + pColData->blockSize);
@ -419,15 +568,43 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) {
pSmaH->pTsdb = pTsdb; pSmaH->pTsdb = pTsdb;
pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision);
return TSDB_CODE_SUCCESS;
}
static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
if (pSmaH) {
tsdbCloseDBF(&pSmaH->dFile);
}
} }
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) { static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) {
// TODO STsdb *pTsdb = pSmaH->pTsdb;
pSmaH->pDFile = "tSma_interval_file_name"; ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL);
char tSmaFile[TSDB_FILENAME_LEN] = {0};
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid);
pSmaH->dFile.path = strdup(tSmaFile);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/**
* @brief
*
* @param pTsdb
* @param interval Interval calculated by DB's precision
* @param storageLevel
* @return int32_t
*/
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) {
STsdbCfg *pCfg = REPO_CFG(pTsdb);
int32_t daysPerFile = pCfg->daysPerFile;
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]);
daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS;
}
return daysPerFile;
}
/** /**
* @brief Insert/Update Time-range-wise SMA data. * @brief Insert/Update Time-range-wise SMA data.
@ -441,51 +618,72 @@ static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData,
* @param msg * @param msg
* @return int32_t * @return int32_t
*/ */
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb); STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
STSmaWriteH tSmaH = {0};
tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData); if (!pTsdb->pTSmaEnv) {
terrno = TSDB_CODE_INVALID_PTR;
tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
return terrno;
}
if (pData->dataLen <= 0) { if (pData->dataLen <= 0) {
TASSERT(0); TASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return TSDB_CODE_FAILED;
} }
// Step 1: Judge the storage level STSmaWriteH tSmaH = {0};
int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit);
int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) {
return TSDB_CODE_FAILED;
}
// Step 1: Judge the storage level and days
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file // - Set and open the DFile or the B+Tree file
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
// Save all the TSma data to one file
// TODO: tsdbStartTSmaCommit(); // TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
tsdbInsertTSmaDataSection(&tSmaH, pData); if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
return TSDB_CODE_FAILED;
}
if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
return TSDB_CODE_FAILED;
}
// TODO:tsdbEndTSmaCommit(); // TODO:tsdbEndTSmaCommit();
// reset the SSmaStat // Step 3: reset the SSmaStat
tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey); tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
tsdbDestroyTSmaWriteH(&tSmaH);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) { static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) {
// TODO STsdb *pTsdb = pSmaH->pTsdb;
pSmaH->pDFile = "rSma_interval_file_name";
char tSmaFile[TSDB_FILENAME_LEN] = {0};
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid);
pSmaH->dFile.path = strdup(tSmaFile);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb); STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
STSmaWriteH tSmaH = {0}; STSmaWriteH tSmaH = {0};
tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData); tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData);
@ -496,7 +694,7 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
} }
// Step 1: Judge the storage level // Step 1: Judge the storage level
int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit); int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile;
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
@ -507,45 +705,46 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
// Save all the TSma data to one file // Save all the TSma data to one file
// TODO: tsdbStartTSmaCommit(); // TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
tsdbInsertTSmaDataSection(&tSmaH, pData); tsdbInsertTSmaDataSection(&tSmaH, pData);
// TODO:tsdbEndTSmaCommit(); // TODO:tsdbEndTSmaCommit();
// reset the SSmaStat // reset the SSmaStat
tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey); tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/** /**
* @brief Init of tSma ReadH * @brief
* *
* @param pSmaH * @param pSmaH
* @param pTsdb * @param pTsdb
* @param param * @param interval
* @param pData * @param intervalUnit
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit) {
pSmaH->pTsdb = pTsdb; pSmaH->pTsdb = pTsdb;
pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision);
// pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); pSmaH->storageLevel = tsdbGetSmaStorageLevel(interval, intervalUnit);
pSmaH->days = tsdbGetTSmaDays(pTsdb, pSmaH->interval, pSmaH->storageLevel);
} }
/** /**
* @brief Init of tSma FS * @brief Init of tSma FS
* *
* @param pReadH * @param pReadH
* @param param * @param skey
* @param queryWin
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey) {
int32_t storageLevel = 0; //tsdbJudgeStorageLevel(param->interval, param->intervalUnit); int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pSmaH->pTsdb)->precision));
int32_t daysPerFile = char tSmaFile[TSDB_FILENAME_LEN] = {0};
storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile; snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pSmaH->pTsdb), fid);
pReadH->storageLevel = storageLevel; pSmaH->dFile.path = strdup(tSmaFile);
pReadH->days = daysPerFile; pSmaH->smaFsIter.iter = 0;
pReadH->smaFsIter.iter = 0; pSmaH->smaFsIter.fid = fid;
} }
/** /**
@ -557,17 +756,18 @@ static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) {
* @return true * @return true
* @return false * @return false
*/ */
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf; SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf;
int32_t nSmaFs = taosArrayGetSize(smaFs); int32_t nSmaFs = taosArrayGetSize(smaFs);
pReadH->pDFile = NULL; tsdbCloseDBF(&pReadH->dFile);
#if 0
while (pReadH->smaFsIter.iter < nSmaFs) { while (pReadH->smaFsIter.iter < nSmaFs) {
void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter); void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter);
if (pSmaFile) { // match(indexName, queryWindow) if (pSmaFile) { // match(indexName, queryWindow)
// TODO: select the file by index_name ... // TODO: select the file by index_name ...
pReadH->pDFile = pSmaFile; pReadH->dFile = pSmaFile;
++pReadH->smaFsIter.iter; ++pReadH->smaFsIter.iter;
break; break;
} }
@ -578,41 +778,83 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) {
tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]"); tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]");
return true; return true;
} }
#endif
return false; return false;
} }
/** /**
* @brief Return the data between queryWin and fill the pData. * @brief
* *
* @param pTsdb * @param pTsdb Return the data between queryWin and fill the pData.
* @param param
* @param pData * @param pData
* @param queryWin * @param indexUid
* @param interval
* @param intervalUnit
* @param tableUid
* @param colId
* @param pQuerySKey
* @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM. * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
* @return int32_t * @return int32_t
*/ */
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult) { static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
SSmaStatItem *pItem = int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey,
(SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &pData->indexUid, sizeof(pData->indexUid)); int32_t nMaxResult) {
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
if (pItem == NULL) { if (pItem == NULL) {
// mark all window as expired and notify query module to query raw TS data. // mark all window as expired and notify query module to query raw TS data.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t nQueryWin = 0; #if 0
int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
for (int32_t n = 0; n < nQueryWin; ++n) { for (int32_t n = 0; n < nQueryWin; ++n) {
TSKEY thisWindow = n; TSKEY skey = taosArrayGet(pQuerySKey, n);
if (taosHashGet(pItem->expiredWindows, &thisWindow, sizeof(thisWindow)) != NULL) { if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY)) != NULL) {
// TODO: mark this window as expired. // TODO: mark this window as expired.
} }
} }
#endif
#if 0
if (taosHashGet(pItem->expiredWindows, &querySkey, sizeof(TSKEY)) != NULL) {
// TODO: mark this window as expired.
}
#endif
STSmaReadH tReadH = {0}; STSmaReadH tReadH = {0};
tsdbInitTSmaReadH(&tReadH, pTsdb, pData); tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit);
tsdbCloseDBF(&tReadH.dFile);
tsdbInitTSmaFile(&tReadH, queryWin); tsdbInitTSmaFile(&tReadH, querySkey);
if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) {
tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno));
return TSDB_CODE_FAILED;
}
char smaKey[SMA_KEY_LEN] = {0};
void *pSmaKey = &smaKey;
tsdbEncodeTSmaKey(tableUid, colId, querySkey, (void **)&pSmaKey);
tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb),
tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
*(int64_t *)POINTER_SHIFT(smaKey, 10), SMA_KEY_LEN);
void * result = NULL;
uint32_t valueSize = 0;
if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) {
tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64
" since %s",
REPO_ID(pTsdb), indexUid, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
*(int64_t *)POINTER_SHIFT(smaKey, 10), tstrerror(terrno));
tsdbCloseDBF(&tReadH.dFile);
return TSDB_CODE_FAILED;
}
tfree(result);
#ifdef SMA_PRINT_DEBUG_LOG
for (uint32_t v = 0; v < valueSize; v += 8) {
tsdbWarn("vgId:%d v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v));
}
#endif
#if 0
int32_t nResult = 0; int32_t nResult = 0;
int64_t lastKey = 0; int64_t lastKey = 0;
@ -634,8 +876,9 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *
} }
} }
} }
#endif
// read data from file and fill the result // read data from file and fill the result
tsdbCloseDBF(&tReadH.dFile);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -673,4 +916,55 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) {
// } // }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif #endif
/**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
* TODO: Who is responsible for resource allocate and release?
*/
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) {
tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
/**
* @brief Insert Time-range-wise Rollup Sma(RSma) data
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
*/
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, int32_t nMaxResult) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySkey,
nMaxResult)) < 0) {
tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}

View File

@ -34,6 +34,7 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL); return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL);
} }
#if 0
/** /**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine * @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
* *
@ -51,6 +52,14 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
return code; return code;
} }
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) {
tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
/** /**
* @brief Insert Time-range-wise Rollup Sma(RSma) data * @brief Insert Time-range-wise Rollup Sma(RSma) data
* *
@ -65,4 +74,6 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
} }
return code; return code;
} }
#endif

View File

@ -33,7 +33,7 @@ int main(int argc, char **argv) {
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
TEST(testCase, tSmaEncodeDecodeTest) { TEST(testCase, tSma_Meta_Encode_Decode_Test) {
// encode // encode
STSma tSma = {0}; STSma tSma = {0};
tSma.version = 0; tSma.version = 0;
@ -87,8 +87,9 @@ TEST(testCase, tSmaEncodeDecodeTest) {
tdDestroyTSma(&tSma); tdDestroyTSma(&tSma);
tdDestroyTSmaWrapper(&dstTSmaWrapper); tdDestroyTSmaWrapper(&dstTSmaWrapper);
} }
#if 1 #if 1
TEST(testCase, tSma_DB_Put_Get_Del_Test) { TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
const char * smaIndexName1 = "sma_index_test_1"; const char * smaIndexName1 = "sma_index_test_1";
const char * smaIndexName2 = "sma_index_test_2"; const char * smaIndexName2 = "sma_index_test_2";
const char * timezone = "Asia/Shanghai"; const char * timezone = "Asia/Shanghai";
@ -220,13 +221,84 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
#endif #endif
#if 1 #if 1
TEST(testCase, tSmaInsertTest) { TEST(testCase, tSma_Data_Insert_Query_Test) {
const int64_t indexUid = 2000000002; // step 1: prepare meta
const char * smaIndexName1 = "sma_index_test_1";
const char * timezone = "Asia/Shanghai";
const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;";
const char * tagsFilter = "where tags.location='Beijing' and tags.district='ChaoYang'";
const char * smaTestDir = "./smaTest";
const tb_uid_t tbUid = 1234567890;
const int64_t indexUid1 = 2000000001;
const int64_t interval1 = 1;
const int8_t intervalUnit1 = TD_TIME_UNIT_DAY;
const uint32_t nCntTSma = 2;
TSKEY skey1 = 1646987196;
const int64_t testSmaData1 = 100;
const int64_t testSmaData2 = 200;
// encode
STSma tSma = {0};
tSma.version = 0;
tSma.intervalUnit = TD_TIME_UNIT_DAY;
tSma.interval = 1;
tSma.slidingUnit = TD_TIME_UNIT_HOUR;
tSma.sliding = 0;
tSma.indexUid = indexUid1;
tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN);
tstrncpy(tSma.timezone, timezone, TD_TIMEZONE_LEN);
tSma.tableUid = tbUid;
tSma.exprLen = strlen(expr);
tSma.expr = (char *)calloc(tSma.exprLen + 1, 1);
tstrncpy(tSma.expr, expr, tSma.exprLen + 1);
tSma.tagsFilterLen = strlen(tagsFilter);
tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1);
tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1);
SMeta * pMeta = NULL;
STSma * pSmaCfg = &tSma;
const SMetaCfg *pMetaCfg = &defaultMetaOptions;
taosRemoveDir(smaTestDir);
pMeta = metaOpen(smaTestDir, pMetaCfg, NULL);
assert(pMeta != NULL);
// save index 1
EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
// step 2: insert data
STSmaDataWrapper *pSmaData = NULL; STSmaDataWrapper *pSmaData = NULL;
STsdb tsdb = {0}; STsdb tsdb = {0};
STsdbCfg * pCfg = &tsdb.config; STsdbCfg * pCfg = &tsdb.config;
pCfg->daysPerFile = 1; tsdb.pMeta = pMeta;
tsdb.vgId = 2;
tsdb.config.daysPerFile = 10; // default days is 10
tsdb.config.keep1 = 30;
tsdb.config.keep2 = 90;
tsdb.config.keep = 365;
tsdb.config.precision = TSDB_TIME_PRECISION_MILLI;
tsdb.config.update = TD_ROW_OVERWRITE_UPDATE;
tsdb.config.compression = TWO_STAGE_COMP;
switch (tsdb.config.precision) {
case TSDB_TIME_PRECISION_MILLI:
skey1 *= 1e3;
break;
case TSDB_TIME_PRECISION_MICRO:
skey1 *= 1e6;
break;
case TSDB_TIME_PRECISION_NANO:
skey1 *= 1e9;
break;
default: // ms
skey1 *= 1e3;
break;
}
char *msg = (char *)calloc(100, 1);
EXPECT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
// init // init
int32_t allocCnt = 0; int32_t allocCnt = 0;
@ -235,21 +307,21 @@ TEST(testCase, tSmaInsertTest) {
void * buf = NULL; void * buf = NULL;
EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0); EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0);
int32_t bufSize = taosTSizeof(buf); int32_t bufSize = taosTSizeof(buf);
int32_t numOfTables = 25; int32_t numOfTables = 10;
col_id_t numOfCols = 4096; col_id_t numOfCols = 4096;
EXPECT_GT(numOfCols, 0); EXPECT_GT(numOfCols, 0);
pSmaData = (STSmaDataWrapper *)buf; pSmaData = (STSmaDataWrapper *)buf;
printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData); printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData);
pSmaData->skey = 1646987196; pSmaData->skey = skey1;
pSmaData->interval = 10; pSmaData->interval = interval1;
pSmaData->intervalUnit = TD_TIME_UNIT_MINUTE; pSmaData->intervalUnit = intervalUnit1;
pSmaData->indexUid = indexUid; pSmaData->indexUid = indexUid1;
int32_t len = sizeof(STSmaDataWrapper); int32_t len = sizeof(STSmaDataWrapper);
for (int32_t t = 0; t < numOfTables; ++t) { for (int32_t t = 0; t < numOfTables; ++t) {
STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len); STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len);
pTbData->tableUid = t; pTbData->tableUid = tbUid + t;
int32_t tableDataLen = sizeof(STSmaTbData); int32_t tableDataLen = sizeof(STSmaTbData);
for (col_id_t c = 0; c < numOfCols; ++c) { for (col_id_t c = 0; c < numOfCols; ++c) {
@ -262,8 +334,17 @@ TEST(testCase, tSmaInsertTest) {
} }
STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pSmaData, len + tableDataLen); STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pSmaData, len + tableDataLen);
pColData->colId = c + PRIMARYKEY_TIMESTAMP_COL_ID; pColData->colId = c + PRIMARYKEY_TIMESTAMP_COL_ID;
pColData->blockSize = ((c & 1) == 0) ? 8 : 16;
// TODO: fill col data // TODO: fill col data
if ((c & 1) == 0) {
pColData->blockSize = 8;
memcpy(pColData->data, &testSmaData1, 8);
} else {
pColData->blockSize = 16;
memcpy(pColData->data, &testSmaData1, 8);
memcpy(POINTER_SHIFT(pColData->data, 8), &testSmaData2, 8);
}
tableDataLen += (sizeof(STSmaColData) + pColData->blockSize); tableDataLen += (sizeof(STSmaColData) + pColData->blockSize);
} }
pTbData->dataLen = (tableDataLen - sizeof(STSmaTbData)); pTbData->dataLen = (tableDataLen - sizeof(STSmaTbData));
@ -277,8 +358,24 @@ TEST(testCase, tSmaInsertTest) {
// execute // execute
EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
// release // step 3: query
uint32_t checkDataCnt = 0;
for (int32_t t = 0; t < numOfTables; ++t) {
for (col_id_t c = 0; c < numOfCols; ++c) {
EXPECT_EQ(tsdbGetTSmaData(&tsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t,
c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1),
TSDB_CODE_SUCCESS);
++checkDataCnt;
}
}
printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt);
// release data
taosTZfree(buf); taosTZfree(buf);
// release meta
tdDestroyTSma(&tSma);
metaClose(pMeta);
} }
#endif #endif

View File

@ -183,6 +183,12 @@ static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode
return (SNode*)pDst; return (SNode*)pDst;
} }
static SNode* fillNodeCopy(const SFillNode* pSrc, SFillNode* pDst) {
COPY_SCALAR_FIELD(mode);
CLONE_NODE_FIELD(pValues);
return (SNode*)pDst;
}
static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) { static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
COPY_SCALAR_FIELD(id); COPY_SCALAR_FIELD(id);
CLONE_NODE_LIST_FIELD(pTargets); CLONE_NODE_LIST_FIELD(pTargets);
@ -248,6 +254,17 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo
return (SNode*)pDst; return (SNode*)pDst;
} }
static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(winType);
CLONE_NODE_LIST_FIELD(pFuncs);
COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding);
CLONE_NODE_FIELD(pFill);
return (SNode*)pDst;
}
static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) { static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) {
CLONE_NODE_FIELD(pNode); CLONE_NODE_FIELD(pNode);
COPY_SCALAR_FIELD(subplanType); COPY_SCALAR_FIELD(subplanType);
@ -309,6 +326,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
case QUERY_NODE_ORDER_BY_EXPR: case QUERY_NODE_ORDER_BY_EXPR:
case QUERY_NODE_LIMIT: case QUERY_NODE_LIMIT:
break; break;
case QUERY_NODE_FILL:
return fillNodeCopy((const SFillNode*)pNode, (SFillNode*)pDst);
case QUERY_NODE_DATABLOCK_DESC: case QUERY_NODE_DATABLOCK_DESC:
return dataBlockDescCopy((const SDataBlockDescNode*)pNode, (SDataBlockDescNode*)pDst); return dataBlockDescCopy((const SDataBlockDescNode*)pNode, (SDataBlockDescNode*)pDst);
case QUERY_NODE_SLOT_DESC: case QUERY_NODE_SLOT_DESC:
@ -325,6 +344,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicVnodeModifCopy((const SVnodeModifLogicNode*)pNode, (SVnodeModifLogicNode*)pDst); return logicVnodeModifCopy((const SVnodeModifLogicNode*)pNode, (SVnodeModifLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE: case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst); return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst);
case QUERY_NODE_LOGIC_SUBPLAN: case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst); return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst);
default: default:

View File

@ -117,6 +117,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiExchange"; return "PhysiExchange";
case QUERY_NODE_PHYSICAL_PLAN_SORT: case QUERY_NODE_PHYSICAL_PLAN_SORT:
return "PhysiSort"; return "PhysiSort";
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return "PhysiInterval";
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return "PhysiDispatch"; return "PhysiDispatch";
case QUERY_NODE_PHYSICAL_PLAN_INSERT: case QUERY_NODE_PHYSICAL_PLAN_INSERT:
@ -573,6 +575,65 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
return code; return code;
} }
static const char* jkIntervalPhysiPlanExprs = "Exprs";
static const char* jkIntervalPhysiPlanFuncs = "Funcs";
static const char* jkIntervalPhysiPlanInterval = "Interval";
static const char* jkIntervalPhysiPlanOffset = "Offset";
static const char* jkIntervalPhysiPlanSliding = "Sliding";
static const char* jkIntervalPhysiPlanFill = "Fill";
static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkIntervalPhysiPlanExprs, pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkIntervalPhysiPlanFuncs, pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanInterval, pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanOffset, pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanSliding, pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill);
}
return code;
}
static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
SIntervalPhysiNode* pNode = (SIntervalPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkIntervalPhysiPlanExprs, &pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkIntervalPhysiPlanFuncs, &pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanInterval, &pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanOffset, &pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanSliding, &pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill);
}
return code;
}
static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc"; static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc";
static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) { static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) {
@ -1500,6 +1561,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiExchangeNodeToJson(pObj, pJson); return physiExchangeNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SORT: case QUERY_NODE_PHYSICAL_PLAN_SORT:
break; break;
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return physiIntervalNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return physiDispatchNodeToJson(pObj, pJson); return physiDispatchNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INSERT: case QUERY_NODE_PHYSICAL_PLAN_INSERT:

View File

@ -134,6 +134,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SVnodeModifLogicNode)); return makeNode(type, sizeof(SVnodeModifLogicNode));
case QUERY_NODE_LOGIC_PLAN_EXCHANGE: case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangeLogicNode)); return makeNode(type, sizeof(SExchangeLogicNode));
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return makeNode(type, sizeof(SWindowLogicNode));
case QUERY_NODE_LOGIC_SUBPLAN: case QUERY_NODE_LOGIC_SUBPLAN:
return makeNode(type, sizeof(SSubLogicPlan)); return makeNode(type, sizeof(SSubLogicPlan));
case QUERY_NODE_LOGIC_PLAN: case QUERY_NODE_LOGIC_PLAN:
@ -156,6 +158,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SExchangePhysiNode)); return makeNode(type, sizeof(SExchangePhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SORT: case QUERY_NODE_PHYSICAL_PLAN_SORT:
return makeNode(type, sizeof(SNode)); return makeNode(type, sizeof(SNode));
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return makeNode(type, sizeof(SIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return makeNode(type, sizeof(SDataDispatcherNode)); return makeNode(type, sizeof(SDataDispatcherNode));
case QUERY_NODE_PHYSICAL_PLAN_INSERT: case QUERY_NODE_PHYSICAL_PLAN_INSERT:

View File

@ -198,7 +198,7 @@ col_name(A) ::= column_name(B).
cmd ::= SHOW VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, NULL); } cmd ::= SHOW VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, NULL); }
cmd ::= SHOW db_name(B) NK_DOT VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, &B); } cmd ::= SHOW db_name(B) NK_DOT VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, &B); }
/************************************************ show vgroups ********************************************************/ /************************************************ show mnodes *********************************************************/
cmd ::= SHOW MNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MNODES_STMT, NULL); } cmd ::= SHOW MNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MNODES_STMT, NULL); }
/************************************************ select **************************************************************/ /************************************************ select **************************************************************/

View File

@ -706,9 +706,17 @@ static int32_t translateGroupBy(STranslateContext* pCxt, SNodeList* pGroupByList
return translateExprList(pCxt, pGroupByList); return translateExprList(pCxt, pGroupByList);
} }
static int32_t doTranslateWindow(STranslateContext* pCxt, SNode* pWindow) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateWindow(STranslateContext* pCxt, SNode* pWindow) { static int32_t translateWindow(STranslateContext* pCxt, SNode* pWindow) {
pCxt->currClause = SQL_CLAUSE_WINDOW; pCxt->currClause = SQL_CLAUSE_WINDOW;
return translateExpr(pCxt, pWindow); int32_t code = translateExpr(pCxt, pWindow);
if (TSDB_CODE_SUCCESS == code) {
code = doTranslateWindow(pCxt, pWindow);
}
return code;
} }
static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) { static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) {

View File

@ -183,6 +183,13 @@ TEST_F(ParserTest, selectClause) {
ASSERT_TRUE(run()); ASSERT_TRUE(run());
} }
TEST_F(ParserTest, selectWindow) {
setDatabase("root", "test");
bind("SELECT count(*) FROM t1 interval(10s)");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, selectSyntaxError) { TEST_F(ParserTest, selectSyntaxError) {
setDatabase("root", "test"); setDatabase("root", "test");

View File

@ -304,6 +304,50 @@ static SLogicNode* createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
return (SLogicNode*)pAgg; return (SLogicNode*)pAgg;
} }
static SLogicNode* createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SIntervalWindowNode* pInterval, SSelectStmt* pSelect) {
SWindowLogicNode* pWindow = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW);
CHECK_ALLOC(pWindow, NULL);
pWindow->node.id = pCxt->planNodeId++;
pWindow->winType = WINDOW_TYPE_INTERVAL;
pWindow->interval = ((SValueNode*)pInterval->pInterval)->datum.i;
pWindow->offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0);
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : 0);
if (NULL != pInterval->pFill) {
pWindow->pFill = nodesCloneNode(pInterval->pFill);
CHECK_ALLOC(pWindow->pFill, (SLogicNode*)pWindow);
}
SNodeList* pFuncs = NULL;
CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pFuncs), NULL);
if (NULL != pFuncs) {
pWindow->pFuncs = nodesCloneList(pFuncs);
CHECK_ALLOC(pWindow->pFuncs, (SLogicNode*)pWindow);
}
CHECK_CODE(rewriteExpr(pWindow->node.id, 1, pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW), (SLogicNode*)pWindow);
pWindow->node.pTargets = createColumnByRewriteExps(pCxt, pWindow->pFuncs);
CHECK_ALLOC(pWindow->node.pTargets, (SLogicNode*)pWindow);
return (SLogicNode*)pWindow;
}
static SLogicNode* createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow) {
return NULL;
}
switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_INTERVAL_WINDOW:
return createWindowLogicNodeByInterval(pCxt, (SIntervalWindowNode*)pSelect->pWindow, pSelect);
default:
break;
}
return NULL;
}
static SNodeList* createColumnByProjections(SLogicPlanContext* pCxt, SNodeList* pExprs) { static SNodeList* createColumnByProjections(SLogicPlanContext* pCxt, SNodeList* pExprs) {
SNodeList* pList = nodesMakeList(); SNodeList* pList = nodesMakeList();
CHECK_ALLOC(pList, NULL); CHECK_ALLOC(pList, NULL);
@ -345,6 +389,9 @@ static SLogicNode* createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
pRoot->pConditions = nodesCloneNode(pSelect->pWhere); pRoot->pConditions = nodesCloneNode(pSelect->pWhere);
CHECK_ALLOC(pRoot->pConditions, pRoot); CHECK_ALLOC(pRoot->pConditions, pRoot);
} }
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createWindowLogicNode(pCxt, pSelect));
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) { if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect)); pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect));
} }

View File

@ -473,14 +473,58 @@ static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLog
return (SPhysiNode*)pExchange; return (SPhysiNode*)pExchange;
} }
static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) {
SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_INTERVAL);
CHECK_ALLOC(pInterval, NULL);
pInterval->interval = pWindowLogicNode->interval;
pInterval->offset = pWindowLogicNode->offset;
pInterval->sliding = pWindowLogicNode->sliding;
pInterval->pFill = nodesCloneNode(pWindowLogicNode->pFill);
SNodeList* pPrecalcExprs = NULL;
SNodeList* pFuncs = NULL;
CHECK_CODE(rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs), (SPhysiNode*)pInterval);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
if (NULL != pPrecalcExprs) {
pInterval->pExprs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs);
CHECK_ALLOC(pInterval->pExprs, (SPhysiNode*)pInterval);
CHECK_CODE(addDataBlockDesc(pCxt, pInterval->pExprs, pChildTupe), (SPhysiNode*)pInterval);
}
if (NULL != pFuncs) {
pInterval->pFuncs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs);
CHECK_ALLOC(pInterval->pFuncs, (SPhysiNode*)pInterval);
CHECK_CODE(addDataBlockDesc(pCxt, pInterval->pFuncs, pInterval->node.pOutputDataBlockDesc), (SPhysiNode*)pInterval);
}
CHECK_CODE(setSlotOutput(pCxt, pWindowLogicNode->node.pTargets, pInterval->node.pOutputDataBlockDesc), (SPhysiNode*)pInterval);
return (SPhysiNode*)pInterval;
}
static SPhysiNode* createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) {
switch (pWindowLogicNode->winType) {
case WINDOW_TYPE_INTERVAL:
return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode);
case WINDOW_TYPE_SESSION:
case WINDOW_TYPE_STATE:
break;
default:
break;
}
return NULL;
}
static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SLogicNode* pLogicPlan) { static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SLogicNode* pLogicPlan) {
SNodeList* pChildren = nodesMakeList(); SNodeList* pChildren = nodesMakeList();
CHECK_ALLOC(pChildren, NULL); CHECK_ALLOC(pChildren, NULL);
SNode* pLogicChild; SNode* pLogicChild;
FOREACH(pLogicChild, pLogicPlan->pChildren) { FOREACH(pLogicChild, pLogicPlan->pChildren) {
SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, pSubplan, (SLogicNode*)pLogicChild); if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pChildren, createPhysiNode(pCxt, pSubplan, (SLogicNode*)pLogicChild))) {
if (TSDB_CODE_SUCCESS != nodesListAppend(pChildren, pChildPhyNode)) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
nodesDestroyList(pChildren); nodesDestroyList(pChildren);
return NULL; return NULL;
@ -504,6 +548,9 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
case QUERY_NODE_LOGIC_PLAN_EXCHANGE: case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
pPhyNode = createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicPlan); pPhyNode = createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicPlan);
break; break;
case QUERY_NODE_LOGIC_PLAN_WINDOW:
pPhyNode = createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicPlan);
break;
default: default:
break; break;
} }

View File

@ -166,3 +166,10 @@ TEST_F(PlannerTest, subquery) {
bind("SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b"); bind("SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b");
ASSERT_TRUE(run()); ASSERT_TRUE(run());
} }
TEST_F(PlannerTest, interval) {
setDatabase("root", "test");
bind("SELECT count(*) FROM t1 interval(10s)");
ASSERT_TRUE(run());
}

View File

@ -120,6 +120,10 @@ typedef struct {
// SEpSet* pSet; // for synchronous API // SEpSet* pSet; // for synchronous API
} SRpcReqContext; } SRpcReqContext;
typedef SRpcMsg STransMsg;
typedef SRpcInfo STrans;
typedef SRpcConnInfo STransHandleInfo;
typedef struct { typedef struct {
SEpSet epSet; // ip list provided by app SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app void* ahandle; // handle provided by app
@ -134,8 +138,8 @@ typedef struct {
int8_t connType; // connection type int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API tsem_t* pSem; // for synchronous API
int hThrdIdx; int hThrdIdx;
char* ip; char* ip;
@ -249,4 +253,15 @@ void transUnrefSrvHandle(void* handle);
void transRefCliHandle(void* handle); void transRefCliHandle(void* handle);
void transUnrefCliHandle(void* handle); void transUnrefCliHandle(void* handle);
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg);
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
void transSendResponse(const STransMsg* pMsg);
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void transCloseClient(void* arg);
void transCloseServer(void* arg);
#endif #endif

View File

@ -64,6 +64,7 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
bool (*pfp)(void* parent, tmsg_t msgType); bool (*pfp)(void* parent, tmsg_t msgType);
void* (*mfp)(void* parent, tmsg_t msgType);
int32_t refCount; int32_t refCount;
void* parent; void* parent;

View File

@ -18,8 +18,9 @@
#include "transComm.h" #include "transComm.h"
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
taosInitServer, taosInitClient}; transInitServer, transInitClient};
void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient};
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
void* rpcOpen(const SRpcInit* pInit) { void* rpcOpen(const SRpcInit* pInit) {
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
@ -34,11 +35,12 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->pfp = pInit->pfp; pRpc->pfp = pInit->pfp;
pRpc->mfp = pInit->mfp;
if (pInit->connType == TAOS_CONN_SERVER) { if (pInit->connType == TAOS_CONN_SERVER) {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
} else { } else {
pRpc->numOfThreads = pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
} }
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
@ -116,6 +118,24 @@ int32_t rpcInit() {
return 0; return 0;
} }
void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg* pMsg, int64_t *pRid) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
transSendRequest(shandle, ip, port, pMsg);
}
void rpcSendRecv(void* shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
transSendRecv(shandle, ip, port, pMsg, pRsp);
}
void rpcSendResponse(const SRpcMsg *pMsg) {
transSendResponse(pMsg);
}
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
return transGetConnInfo((void *)thandle, pInfo);
}
void rpcCleanup(void) { void rpcCleanup(void) {
// impl later // impl later
// //
@ -129,6 +149,7 @@ void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosRefHandle[type])(handle); (*taosRefHandle[type])(handle);
} }
void rpcUnrefHandle(void* handle, int8_t type) { void rpcUnrefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosUnRefHandle[type])(handle); (*taosUnRefHandle[type])(handle);

View File

@ -42,7 +42,7 @@ typedef struct SCliConn {
typedef struct SCliMsg { typedef struct SCliMsg {
STransConnCtx* ctx; STransConnCtx* ctx;
SRpcMsg msg; STransMsg msg;
queue q; queue q;
uint64_t st; uint64_t st;
} SCliMsg; } SCliMsg;
@ -105,9 +105,9 @@ static void cliHandleExcept(SCliConn* conn);
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void cliSendQuit(SCliThrdObj* thrd); static void cliSendQuit(SCliThrdObj* thrd);
static void destroyUserdata(SRpcMsg* userdata); static void destroyUserdata(STransMsg* userdata);
static int cliRBChoseIdx(SRpcInfo* pTransInst); static int cliRBChoseIdx(STrans* pTransInst);
static void destroyCmsg(SCliMsg* cmsg); static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx); static void transDestroyConnCtx(STransConnCtx* ctx);
@ -118,11 +118,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
do { \ do { \
if (thrd->quit) { \ if (thrd->quit) { \
cliHandleExcept(conn); \ cliHandleExcept(conn); \
goto _RETURE; \ goto _RETURE; \
} \ } \
} while (0) } while (0)
@ -130,20 +130,25 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_HANDLE_BROKEN(conn) \ #define CONN_HANDLE_BROKEN(conn) \
do { \ do { \
if (conn->broken) { \ if (conn->broken) { \
cliHandleExcept(conn); \ cliHandleExcept(conn); \
goto _RETURE; \ goto _RETURE; \
} \ } \
} while (0); } while (0);
#define CONN_SET_PERSIST_BY_APP(conn) \
do { \
if (conn->persist == false) { \
conn->persist = true; \
transRefCliHandle(conn); \
} \
} while (0)
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false)
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
static void* cliNotifyApp() {} void cliHandleResp(SCliConn* conn) {
static void cliHandleResp(SCliConn* conn) {
SCliMsg* pMsg = conn->data;
STransConnCtx* pCtx = pMsg->ctx;
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
SRpcInfo* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
@ -152,19 +157,29 @@ static void cliHandleResp(SCliConn* conn) {
// buf's mem alread translated to rpcMsg.pCont // buf's mem alread translated to rpcMsg.pCont
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
SRpcMsg rpcMsg = {0}; STransMsg rpcMsg = {0};
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = transContFromHead((char*)pHead); rpcMsg.pCont = transContFromHead((char*)pHead);
rpcMsg.code = pHead->code; rpcMsg.code = pHead->code;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = NULL;
if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { SCliMsg* pMsg = conn->data;
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL;
} else {
rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
}
// if (rpcMsg.ahandle == NULL) {
// tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn);
// return;
//}
if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) {
rpcMsg.handle = conn; rpcMsg.handle = conn;
transRefCliHandle(conn); CONN_SET_PERSIST_BY_APP(conn);
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
conn->persist = 1;
tDebug("cli conn %p persist by app", conn);
} }
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
@ -173,7 +188,7 @@ static void cliHandleResp(SCliConn* conn) {
conn->secured = pHead->secured; conn->secured = pHead->secured;
if (pCtx->pSem == NULL) { if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, conn); tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
} else { } else {
@ -184,8 +199,7 @@ static void cliHandleResp(SCliConn* conn) {
uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb);
// user owns conn->persist = 1 if (CONN_NO_PERSIST_BY_APP(conn)) {
if (conn->persist == 0) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
} }
destroyCmsg(conn->data); destroyCmsg(conn->data);
@ -196,24 +210,32 @@ static void cliHandleResp(SCliConn* conn) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
} }
static void cliHandleExcept(SCliConn* pConn) {
void cliHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) { if (pConn->data == NULL) {
// handle conn except in conn pool if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
return; return;
}
} }
SCliThrdObj* pThrd = pConn->hostThrd; SCliThrdObj* pThrd = pConn->hostThrd;
SRpcInfo* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
SRpcMsg rpcMsg = {0}; STransMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcMsg.msgType = pMsg->msg.msgType + 1; rpcMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
rpcMsg.ahandle = NULL;
if (pCtx->pSem == NULL) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL;
} else {
rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
}
if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
} else { } else {
@ -228,9 +250,9 @@ static void cliHandleExcept(SCliConn* pConn) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} }
static void cliTimeoutCb(uv_timer_t* handle) { void cliTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data; SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->pTransInst; STrans* pRpc = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; int64_t currentTime = pThrd->nextTimeout;
tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label); tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label);
@ -252,11 +274,12 @@ static void cliTimeoutCb(uv_timer_t* handle) {
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
static void* createConnPool(int size) {
void* createConnPool(int size) {
// thread local, no lock // thread local, no lock
return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
} }
static void* destroyConnPool(void* pool) { void* destroyConnPool(void* pool) {
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
while (connList != NULL) { while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conn)) { while (!QUEUE_IS_EMPTY(&connList->conn)) {
@ -301,7 +324,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; STrans* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
@ -358,6 +381,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
conn->persist = false;
conn->broken = false; conn->broken = false;
transRefCliHandle(conn); transRefCliHandle(conn);
return conn; return conn;
@ -395,16 +419,16 @@ static void cliSendCb(uv_write_t* req, int status) {
uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb);
} }
static void cliSend(SCliConn* pConn) { void cliSend(SCliConn* pConn) {
CONN_HANDLE_BROKEN(pConn); CONN_HANDLE_BROKEN(pConn);
SCliMsg* pCliMsg = pConn->data; SCliMsg* pCliMsg = pConn->data;
STransConnCtx* pCtx = pCliMsg->ctx; STransConnCtx* pCtx = pCliMsg->ctx;
SCliThrdObj* pThrd = pConn->hostThrd; SCliThrdObj* pThrd = pConn->hostThrd;
SRpcInfo* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
int msgLen = transMsgLenFromCont(pMsg->contLen); int msgLen = transMsgLenFromCont(pMsg->contLen);
@ -442,7 +466,8 @@ static void cliSend(SCliConn* pConn) {
_RETURE: _RETURE:
return; return;
} }
static void cliConnCb(uv_connect_t* req, int status) {
void cliConnCb(uv_connect_t* req, int status) {
// impl later // impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status != 0) { if (status != 0) {
@ -472,11 +497,11 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
pThrd->quit = true; pThrd->quit = true;
uv_stop(pThrd->loop); uv_stop(pThrd->loop);
} }
static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = NULL; SCliConn* conn = NULL;
if (pMsg->msg.handle != NULL) { if (pMsg->msg.handle != NULL) {
conn = (SCliConn*)(pMsg->msg.handle); conn = (SCliConn*)(pMsg->msg.handle);
transUnrefCliHandle(conn);
if (conn != NULL) { if (conn != NULL) {
tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn);
} }
@ -487,13 +512,14 @@ static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
} }
return conn; return conn;
} }
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs(); uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st; uint64_t el = et - pMsg->st;
tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el);
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SCliConn* conn = cliGetConn(pMsg, pThrd); SCliConn* conn = cliGetConn(pMsg, pThrd);
if (conn != NULL) { if (conn != NULL) {
@ -514,6 +540,7 @@ static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
} }
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
} }
static void cliAsyncCb(uv_async_t* handle) { static void cliAsyncCb(uv_async_t* handle) {
@ -522,7 +549,7 @@ static void cliAsyncCb(uv_async_t* handle) {
SCliMsg* pMsg = NULL; SCliMsg* pMsg = NULL;
// batch process to avoid to lock/unlock frequently // batch process to avoid to lock/unlock frequently
queue wq; queue wq;
pthread_mutex_lock(&item->mtx); pthread_mutex_lock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq); QUEUE_MOVE(&item->qmsg, &wq);
pthread_mutex_unlock(&item->mtx); pthread_mutex_unlock(&item->mtx);
@ -551,10 +578,10 @@ static void* cliWorkThread(void* arg) {
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SCliObj* cli = calloc(1, sizeof(SCliObj)); SCliObj* cli = calloc(1, sizeof(SCliObj));
SRpcInfo* pRpc = shandle; STrans* pRpc = shandle;
memcpy(cli->label, label, strlen(label)); memcpy(cli->label, label, strlen(label));
cli->numOfThreads = numOfThreads; cli->numOfThreads = numOfThreads;
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
@ -573,7 +600,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
return cli; return cli;
} }
static void destroyUserdata(SRpcMsg* userdata) { static void destroyUserdata(STransMsg* userdata) {
if (userdata->pCont == NULL) { if (userdata->pCont == NULL) {
return; return;
} }
@ -629,12 +656,20 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
free(ctx); free(ctx);
} }
// //
static void cliSendQuit(SCliThrdObj* thrd) { void cliSendQuit(SCliThrdObj* thrd) {
// cli can stop gracefully // cli can stop gracefully
SCliMsg* msg = calloc(1, sizeof(SCliMsg)); SCliMsg* msg = calloc(1, sizeof(SCliMsg));
transSendAsync(thrd->asyncPool, &msg->q); transSendAsync(thrd->asyncPool, &msg->q);
} }
void taosCloseClient(void* arg) { int cliRBChoseIdx(STrans* pTransInst) {
int64_t index = pTransInst->index;
if (pTransInst->index++ >= pTransInst->numOfThreads) {
pTransInst->index = 0;
}
return index % pTransInst->numOfThreads;
}
void transCloseClient(void* arg) {
SCliObj* cli = arg; SCliObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) { for (int i = 0; i < cli->numOfThreads; i++) {
cliSendQuit(cli->pThreadObj[i]); cliSendQuit(cli->pThreadObj[i]);
@ -643,13 +678,6 @@ void taosCloseClient(void* arg) {
free(cli->pThreadObj); free(cli->pThreadObj);
free(cli); free(cli);
} }
static int cliRBChoseIdx(SRpcInfo* pTransInst) {
int64_t index = pTransInst->index;
if (pTransInst->index++ >= pTransInst->numOfThreads) {
pTransInst->index = 0;
}
return index % pTransInst->numOfThreads;
}
void transRefCliHandle(void* handle) { void transRefCliHandle(void* handle) {
if (handle == NULL) { if (handle == NULL) {
return; return;
@ -665,17 +693,11 @@ void transUnrefCliHandle(void* handle) {
if (ref == 0) { if (ref == 0) {
cliDestroyConn((SCliConn*)handle, true); cliDestroyConn((SCliConn*)handle, true);
} }
// unref cli handle
} }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
// impl later
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
SRpcInfo* pTransInst = (SRpcInfo*)shandle; void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) {
STrans* pTransInst = (STrans*)shandle;
int index = CONN_HOST_THREAD_INDEX(pMsg->handle); int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle);
if (index == -1) { if (index == -1) {
index = cliRBChoseIdx(pTransInst); index = cliRBChoseIdx(pTransInst);
} }
@ -683,6 +705,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
// imp later // imp later
} }
tDebug("send request at thread:%d %p", index, pMsg);
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->ahandle = pMsg->ahandle; pCtx->ahandle = pMsg->ahandle;
pCtx->msgType = pMsg->msgType; pCtx->msgType = pMsg->msgType;
@ -701,14 +724,9 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
} }
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) {
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { STrans* pTransInst = (STrans*)shandle;
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); int index = CONN_HOST_THREAD_INDEX(pReq->handle);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
SRpcInfo* pTransInst = (SRpcInfo*)shandle;
int index = CONN_HOST_THREAD_INDEX(pReq->handle);
if (index == -1) { if (index == -1) {
index = cliRBChoseIdx(pTransInst); index = cliRBChoseIdx(pTransInst);
} }
@ -734,7 +752,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
tsem_wait(pSem); tsem_wait(pSem);
tsem_destroy(pSem); tsem_destroy(pSem);
free(pSem); free(pSem);
return;
} }
#endif #endif

View File

@ -37,8 +37,7 @@ typedef struct SSrvConn {
struct sockaddr_in addr; struct sockaddr_in addr;
struct sockaddr_in locaddr; struct sockaddr_in locaddr;
// SRpcMsg sendMsg;
// del later
char secured; char secured;
int spi; int spi;
char info[64]; char info[64];
@ -49,7 +48,7 @@ typedef struct SSrvConn {
typedef struct SSrvMsg { typedef struct SSrvMsg {
SSrvConn* pConn; SSrvConn* pConn;
SRpcMsg msg; STransMsg msg;
queue q; queue q;
} SSrvMsg; } SSrvMsg;
@ -207,20 +206,20 @@ static void uvHandleReq(SSrvConn* pConn) {
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
SRpcInfo* pRpc = (SRpcInfo*)p->shandle; STrans* pRpc = (STrans*)p->shandle;
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
int32_t dlen = 0; int32_t dlen = 0;
if (transDecompressMsg(NULL, 0, NULL)) { if (transDecompressMsg(NULL, 0, NULL)) {
// add compress later // add compress later
// pHead = rpcDecompressRpcMsg(pHead); // pHead = rpcDecompresSTransMsg(pHead);
} else { } else {
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
// impl later // impl later
// //
} }
SRpcMsg rpcMsg; STransMsg rpcMsg;
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content; rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
@ -260,7 +259,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
} }
tError("server conn %p read error: %s", conn, uv_err_name(nread)); tError("server conn %p read error: %s", conn, uv_err_name(nread));
if (nread < 0 || nread == UV_EOF) { if (nread < 0) {
conn->broken = true; conn->broken = true;
transUnrefSrvHandle(conn); transUnrefSrvHandle(conn);
@ -318,8 +317,8 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
// impl later; // impl later;
tTrace("server conn %p prepare to send resp", smsg->pConn); tTrace("server conn %p prepare to send resp", smsg->pConn);
SSrvConn* pConn = smsg->pConn; SSrvConn* pConn = smsg->pConn;
SRpcMsg* pMsg = &smsg->msg; STransMsg* pMsg = &smsg->msg;
if (pMsg->pCont == 0) { if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0); pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0; pMsg->contLen = 0;
@ -547,7 +546,7 @@ static bool addHandleToWorkloop(void* arg) {
return false; return false;
} }
// SRpcInfo* pRpc = pThrd->shandle; // STrans* pRpc = pThrd->shandle;
uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_open(pThrd->pipe, pThrd->fd); uv_pipe_open(pThrd->pipe, pThrd->fd);
@ -668,7 +667,7 @@ static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
return msgLen; return msgLen;
} }
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SServerObj* srv = calloc(1, sizeof(SServerObj)); SServerObj* srv = calloc(1, sizeof(SServerObj));
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
srv->numOfThreads = numOfThreads; srv->numOfThreads = numOfThreads;
@ -720,7 +719,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
return srv; return srv;
End: End:
taosCloseServer(srv); transCloseServer(srv);
return NULL; return NULL;
} }
@ -740,7 +739,7 @@ void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
transSendAsync(pThrd->asyncPool, &srvMsg->q); transSendAsync(pThrd->asyncPool, &srvMsg->q);
} }
void taosCloseServer(void* arg) { void transCloseServer(void* arg) {
// impl later // impl later
SServerObj* srv = arg; SServerObj* srv = arg;
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
@ -786,7 +785,7 @@ void transUnrefSrvHandle(void* handle) {
} }
// unref srv handle // unref srv handle
} }
void rpcSendResponse(const SRpcMsg* pMsg) { void transSendResponse(const STransMsg* pMsg) {
if (pMsg->handle == NULL) { if (pMsg->handle == NULL) {
return; return;
} }
@ -799,14 +798,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
tTrace("server conn %p start to send resp", pConn); tTrace("server conn %p start to send resp", pConn);
transSendAsync(pThrd->asyncPool, &srvMsg->q); transSendAsync(pThrd->asyncPool, &srvMsg->q);
} }
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { SSrvConn* pConn = thandle;
SSrvConn* pConn = thandle;
struct sockaddr_in addr = pConn->addr; struct sockaddr_in addr = pConn->addr;
pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr); pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
pInfo->clientPort = ntohs(addr.sin_port); pInfo->clientPort = ntohs(addr.sin_port);
tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user)); tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
return 0; return 0;
} }

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "osLocale.h" #include "osLocale.h"

View File

@ -349,6 +349,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
// query // query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")

View File

@ -45,10 +45,6 @@ function gitPullBranchInfo () {
git pull origin $branch_name ||: git pull origin $branch_name ||:
echo "==== git pull $branch_name end ====" echo "==== git pull $branch_name end ===="
git pull --recurse-submodules git pull --recurse-submodules
cd tests
git checkout $branch_name
git pull
cd ..
} }
function compileTDengineVersion() { function compileTDengineVersion() {

View File

@ -14,13 +14,19 @@ $st = $stPrefix . $i
$tb = $tbPrefix . $i $tb = $tbPrefix . $i
print =============== step1 print =============== step1
sql create database $db replica 1 days 20 keep 2000 cache 16 vgroups 4 # quorum presicion
sql create database $db vgroups 8 replica 1 days 20 keep 3650 cache 32 blocks 12 minrows 80 maxrows 10000 wal 2 fsync 1000 comp 0 cachelast 2 precision 'us'
sql show databases sql show databases
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
if $rows != 1 then
return -1
endi
if $data00 != $db then if $data00 != $db then
return -1 return -1
endi endi
if $data02 != 4 then if $data02 != 8 then
return -1 return -1
endi endi
if $data03 != 0 then if $data03 != 0 then
@ -32,16 +38,23 @@ endi
if $data06 != 20 then if $data06 != 20 then
return -1 return -1
endi endi
if $data08 != 16 then if $data07 != 3650,3650,3650 then
return -1
endi
if $data08 != 32 then
return -1
endi
if $data09 != 12 then
return -1 return -1
endi endi
print =============== step2 print =============== step2
sql create database $db sql_error create database $db
sql create database if not exists $db
sql show databases sql show databases
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
print =============== step3 print =============== step3
sql drop database $db sql drop database $db

View File

@ -61,10 +61,50 @@ if $rows != 7 then
endi endi
print $data00 $data01 $data02 print $data00 $data01 $data02
print $data10 $data11 $data22 print $data10 $data11 $data12
print $data20 $data11 $data22 print $data20 $data21 $data22
print =============== create normal table
sql create database ndb
sql use ndb
sql create table nt0 (ts timestamp, i int)
sql create table if not exists nt0 (ts timestamp, i int)
sql create table nt1 (ts timestamp, i int)
sql create table if not exists nt1 (ts timestamp, i int)
sql create table if not exists nt3 (ts timestamp, i int)
sql show tables
if $rows != 3 then
return -1
endi
sql insert into nt0 values(now+1s, 1)(now+2s, 2)(now+3s, 3)
sql insert into nt1 values(now+1s, 1)(now+2s, 2)(now+3s, 3)
sql select * from nt1
if $rows != 3 then
return -1
endi
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $data01 != 1 then
return -1
endi
if $data11 != 2 then
return -1
endi
if $data21 != 3 then
return -1
endi
print =============== insert data print =============== insert data
sql use d1
sql insert into c1 values(now+1s, 1) sql insert into c1 values(now+1s, 1)
sql insert into c1 values(now+2s, 2) sql insert into c1 values(now+2s, 2)
sql insert into c1 values(now+3s, 3) sql insert into c1 values(now+3s, 3)
@ -95,7 +135,7 @@ endi
print $data00 $data01 print $data00 $data01
print $data10 $data11 print $data10 $data11
print $data20 $data11 print $data20 $data21
if $data01 != 1 then if $data01 != 1 then
return -1 return -1
@ -160,7 +200,7 @@ endi
print $data00 $data01 print $data00 $data01
print $data10 $data11 print $data10 $data11
print $data20 $data11 print $data20 $data21
if $data01 != 1 then if $data01 != 1 then
return -1 return -1
@ -210,4 +250,27 @@ if $rows != 21 then
return -1 return -1
endi endi
print =============== query data from normal table after restart dnode
sql use ndb
sql select * from nt1
if $rows != 3 then
return -1
endi
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $data01 != 1 then
return -1
endi
if $data11 != 2 then
return -1
endi
if $data21 != 3 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT