Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

This commit is contained in:
Minghao Li 2022-03-06 19:18:34 +08:00
commit f2ac795724
37 changed files with 975 additions and 298 deletions

View File

@ -671,6 +671,11 @@ typedef struct {
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
int64_t numOfSelectReqs;
int64_t numOfInsertReqs;
int64_t numOfInsertSuccessReqs;
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
} SVnodeLoad;
typedef struct {
@ -1837,6 +1842,149 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
}
return buf;
}
typedef enum {
TD_TIME_UNIT_UNKNOWN = -1,
TD_TIME_UNIT_YEAR = 0,
TD_TIME_UNIT_SEASON = 1,
TD_TIME_UNIT_MONTH = 2,
TD_TIME_UNIT_WEEK = 3,
TD_TIME_UNIT_DAY = 4,
TD_TIME_UNIT_HOUR = 5,
TD_TIME_UNIT_MINUTE = 6,
TD_TIME_UNIT_SEC = 7,
TD_TIME_UNIT_MILLISEC = 8,
TD_TIME_UNIT_MICROSEC = 9,
TD_TIME_UNIT_NANOSEC = 10
} ETDTimeUnit;
typedef struct {
uint8_t version; // for compatibility
uint8_t intervalUnit;
uint8_t slidingUnit;
char indexName[TSDB_INDEX_NAME_LEN + 1];
col_id_t numOfColIds;
uint16_t numOfFuncIds;
uint64_t tableUid; // super/common table uid
int64_t interval;
int64_t sliding;
col_id_t* colIds; // N.B. sorted column ids
uint16_t* funcIds; // N.B. sorted sma function ids
} STSma; // Time-range-wise SMA
typedef struct {
uint32_t number;
STSma* tSma;
} STSmaWrapper;
static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) {
if (pSma) {
tfree(pSma->colIds);
tfree(pSma->funcIds);
if (releaseSelf) {
free(pSma);
}
}
}
static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) {
if (pSW && pSW->tSma) {
for (uint32_t i = 0; i < pSW->number; ++i) {
tdDestroyTSma(pSW->tSma + i, false);
}
tfree(pSW->tSma);
}
}
static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
int32_t tlen = 0;
tlen += taosEncodeFixedU8(buf, pSma->version);
tlen += taosEncodeFixedU8(buf, pSma->intervalUnit);
tlen += taosEncodeFixedU8(buf, pSma->slidingUnit);
tlen += taosEncodeString(buf, pSma->indexName);
tlen += taosEncodeFixedU16(buf, pSma->numOfColIds);
tlen += taosEncodeFixedU16(buf, pSma->numOfFuncIds);
tlen += taosEncodeFixedU64(buf, pSma->tableUid);
tlen += taosEncodeFixedI64(buf, pSma->interval);
tlen += taosEncodeFixedI64(buf, pSma->sliding);
for (col_id_t i = 0; i < pSma->numOfColIds; ++i) {
tlen += taosEncodeFixedU16(buf, *(pSma->colIds + i));
}
for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) {
tlen += taosEncodeFixedU16(buf, *(pSma->funcIds + i));
}
return tlen;
}
static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* pSW) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pSW->number);
for (uint32_t i = 0; i < pSW->number; ++i) {
tlen += tEncodeTSma(buf, pSW->tSma + i);
}
return tlen;
}
static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
buf = taosDecodeFixedU8(buf, &pSma->version);
buf = taosDecodeFixedU8(buf, &pSma->intervalUnit);
buf = taosDecodeFixedU8(buf, &pSma->slidingUnit);
buf = taosDecodeStringTo(buf, pSma->indexName);
buf = taosDecodeFixedU16(buf, &pSma->numOfColIds);
buf = taosDecodeFixedU16(buf, &pSma->numOfFuncIds);
buf = taosDecodeFixedU64(buf, &pSma->tableUid);
buf = taosDecodeFixedI64(buf, &pSma->interval);
buf = taosDecodeFixedI64(buf, &pSma->sliding);
if (pSma->numOfColIds > 0) {
pSma->colIds = (col_id_t*)calloc(pSma->numOfColIds, sizeof(STSma));
if (pSma->colIds == NULL) {
return NULL;
}
for (uint16_t i = 0; i < pSma->numOfColIds; ++i) {
buf = taosDecodeFixedU16(buf, pSma->colIds + i);
}
} else {
pSma->colIds = NULL;
}
if (pSma->numOfFuncIds > 0) {
pSma->funcIds = (uint16_t*)calloc(pSma->numOfFuncIds, sizeof(STSma));
if (pSma->funcIds == NULL) {
return NULL;
}
for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) {
buf = taosDecodeFixedU16(buf, pSma->funcIds + i);
}
} else {
pSma->funcIds = NULL;
}
return buf;
}
static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) {
buf = taosDecodeFixedU32(buf, &pSW->number);
pSW->tSma = (STSma*)calloc(pSW->number, sizeof(STSma));
if (pSW->tSma == NULL) {
return NULL;
}
for (uint32_t i = 0; i < pSW->number; ++i) {
if ((buf = tDecodeTSma(buf, pSW->tSma + i)) == NULL) {
for (uint32_t j = i; j >= 0; --i) {
tdDestroyTSma(pSW->tSma + j, false);
}
free(pSW->tSma);
return NULL;
}
}
return buf;
}
typedef struct {
int64_t uid;

View File

@ -95,20 +95,17 @@ typedef struct {
int64_t disk_engine; // Byte
int64_t disk_used; // Byte
int64_t disk_total; // Byte
double net_in; // bytes per second
double net_out; // bytes per second
double io_read;
double io_write;
double io_read_disk;
double io_write_disk;
int32_t req_select;
float req_select_rate;
int32_t req_insert;
int32_t req_insert_success;
float req_insert_rate;
int32_t req_insert_batch;
int32_t req_insert_batch_success;
float req_insert_batch_rate;
int64_t net_in; // bytes
int64_t net_out; // bytes
int64_t io_read; // bytes
int64_t io_write; // bytes
int64_t io_read_disk; // bytes
int64_t io_write_disk; // bytes
int64_t req_select;
int64_t req_insert;
int64_t req_insert_success;
int64_t req_insert_batch;
int64_t req_insert_batch_success;
int32_t errors;
int32_t vnodes_num;
int32_t masters;

View File

@ -43,10 +43,8 @@ int32_t taosGetTotalMemory(int64_t *totalKB);
int32_t taosGetProcMemory(int64_t *usedKB);
int32_t taosGetSysMemory(int64_t *usedKB);
int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize);
int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes);
int32_t taosGetIOSpeed(double *readKB, double *writeKB, double *readDiskKB, double *writeDiskKB);
int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes);
int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes);
int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec);
int32_t taosSystem(const char *cmd);
void taosKillSystem();

View File

@ -206,6 +206,7 @@ typedef enum ELogicConditionType {
#define TSDB_FUNC_TYPE_AGGREGATE 2
#define TSDB_FUNC_MAX_RETRIEVE 1024
#define TSDB_INDEX_NAME_LEN 32
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN

View File

@ -105,10 +105,19 @@ typedef struct {
} SBnodeMgmt;
typedef struct {
int32_t openVnodes;
int32_t totalVnodes;
int32_t masterNum;
int64_t numOfSelectReqs;
int64_t numOfInsertReqs;
int64_t numOfInsertSuccessReqs;
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
} SVnodesStat;
typedef struct {
SVnodesStat stat;
SHashObj *hash;
int32_t openVnodes;
int32_t totalVnodes;
int32_t masterNum;
SRWLatch latch;
SQWorkerPool queryPool;
SFWorkerPool fetchPool;

View File

@ -34,7 +34,6 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
int8_t dndIsMnode(SDnode *pDnode);
#ifdef __cplusplus
}

View File

@ -489,20 +489,19 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
pInfo->disk_engine = 0;
pInfo->disk_used = tsDataSpace.size.used;
pInfo->disk_total = tsDataSpace.size.total;
taosGetBandSpeed(&pInfo->net_in, &pInfo->net_out);
taosGetIOSpeed(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
pInfo->req_select = 0;
pInfo->req_select_rate = 0;
pInfo->req_insert = 0;
pInfo->req_insert_success = 0;
pInfo->req_insert_rate = 0;
pInfo->req_insert_batch = 0;
pInfo->req_insert_batch_success = 0;
pInfo->req_insert_batch_rate = 0;
taosGetCardInfo(&pInfo->net_in, &pInfo->net_out);
taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
SVnodesStat *pStat = &pDnode->vmgmt.stat;
pInfo->req_select = pStat->numOfSelectReqs;
pInfo->req_insert = pStat->numOfInsertReqs;
pInfo->req_insert_success = pStat->numOfInsertSuccessReqs;
pInfo->req_insert_batch = pStat->numOfBatchInsertReqs;
pInfo->req_insert_batch_success = pStat->numOfBatchInsertSuccessReqs;
pInfo->errors = tsNumOfErrorLogs;
pInfo->vnodes_num = pDnode->vmgmt.totalVnodes;
pInfo->masters = pDnode->vmgmt.masterNum;
pInfo->has_mnode = dndIsMnode(pDnode);
pInfo->vnodes_num = pStat->totalVnodes;
pInfo->masters = pStat->masterNum;
pInfo->has_mnode = pDnode->mmgmt.deployed;
}
static void dndSendMonitorReport(SDnode *pDnode) {

View File

@ -640,10 +640,3 @@ int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SM
dndReleaseMnode(pDnode, pMnode);
return code;
}
int8_t dndIsMnode(SDnode *pDnode) {
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL) return 0;
dndReleaseMnode(pDnode, pMnode);
return 1;
}

View File

@ -382,7 +382,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
pMgmt->openVnodes, pMgmt->totalVnodes);
pMgmt->stat.openVnodes, pMgmt->stat.totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
@ -396,7 +396,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
pThread->opened++;
}
atomic_add_fetch_32(&pMgmt->openVnodes, 1);
atomic_add_fetch_32(&pMgmt->stat.openVnodes, 1);
}
dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
@ -422,7 +422,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
return -1;
}
pMgmt->totalVnodes = numOfVnodes;
pMgmt->stat.totalVnodes = numOfVnodes;
int32_t threadNum = tsNumOfCores;
#if 1
@ -470,11 +470,11 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
free(threads);
free(pCfgs);
if (pMgmt->openVnodes != pMgmt->totalVnodes) {
dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes);
if (pMgmt->stat.openVnodes != pMgmt->stat.totalVnodes) {
dError("there are total vnodes:%d, opened:%d", pMgmt->stat.totalVnodes, pMgmt->stat.openVnodes);
return -1;
} else {
dInfo("total vnodes:%d open successfully", pMgmt->totalVnodes);
dInfo("total vnodes:%d open successfully", pMgmt->stat.totalVnodes);
return 0;
}
}
@ -980,13 +980,18 @@ void dndCleanupVnodes(SDnode *pDnode) {
void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodesStat *pStat = &pMgmt->stat;
int32_t totalVnodes = 0;
int32_t masterNum = 0;
int64_t numOfSelectReqs = 0;
int64_t numOfInsertReqs = 0;
int64_t numOfInsertSuccessReqs = 0;
int64_t numOfBatchInsertReqs = 0;
int64_t numOfBatchInsertSuccessReqs = 0;
taosRLockLatch(&pMgmt->latch);
int32_t v = 0;
void *pIter = taosHashIterate(pMgmt->hash, NULL);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
@ -996,12 +1001,24 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
vnodeGetLoad(pVnode->pImpl, &vload);
taosArrayPush(pLoads, &vload);
numOfSelectReqs += vload.numOfSelectReqs;
numOfInsertReqs += vload.numOfInsertReqs;
numOfInsertSuccessReqs += vload.numOfInsertSuccessReqs;
numOfBatchInsertReqs += vload.numOfBatchInsertReqs;
numOfBatchInsertSuccessReqs += vload.numOfBatchInsertSuccessReqs;
totalVnodes++;
if (vload.role == TAOS_SYNC_STATE_LEADER) masterNum++;
pIter = taosHashIterate(pMgmt->hash, pIter);
}
taosRUnLockLatch(&pMgmt->latch);
pMgmt->totalVnodes = totalVnodes;
pMgmt->masterNum = masterNum;
pStat->totalVnodes = totalVnodes;
pStat->masterNum = masterNum;
pStat->numOfSelectReqs = numOfSelectReqs;
pStat->numOfInsertReqs = numOfInsertReqs;
pStat->numOfInsertSuccessReqs = numOfInsertSuccessReqs;
pStat->numOfBatchInsertReqs = numOfBatchInsertReqs;
pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
}

View File

@ -54,5 +54,5 @@ elseif(${META_DB_IMPL} STREQUAL "TDB")
endif()
if(${BUILD_TEST})
# add_subdirectory(test)
add_subdirectory(test)
endif(${BUILD_TEST})

View File

@ -38,8 +38,10 @@ typedef struct SMetaCfg {
typedef struct SMTbCursor SMTbCursor;
typedef struct SMCtbCursor SMCtbCursor;
typedef struct SMSmaCursor SMSmaCursor;
typedef SVCreateTbReq STbCfg;
typedef STSma SSmaCfg;
// SMeta operations
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
@ -50,19 +52,24 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int metaCommit(SMeta *pMeta);
// For Query
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
SSmaCfg * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName);
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
char *metaTbCursorNext(SMTbCursor *pTbCur);
char * metaTbCursorNext(SMTbCursor *pTbCur);
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseSmaCurosr(SMSmaCursor *pSmaCur);
const char * metaSmaCursorNext(SMSmaCursor *pSmaCur);
// Options
void metaOptionsInit(SMetaCfg *pMetaCfg);
void metaOptionsClear(SMetaCfg *pMetaCfg);

View File

@ -33,6 +33,8 @@ int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, SSmaCfg* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, const char* indexName);
// SMetaCache
int metaOpenCache(SMeta* pMeta);

View File

@ -16,6 +16,10 @@
#ifndef _TD_TSDB_COMMIT_H_
#define _TD_TSDB_COMMIT_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
int minFid;
int midFid;
@ -66,4 +70,8 @@ int tsdbApplyRtn(STsdbRepo *pRepo);
#endif
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_COMMIT_H_ */

View File

@ -18,6 +18,10 @@
#include "tsdbFile.h"
#ifdef __cplusplus
extern "C" {
#endif
// ================== TSDB global config
extern bool tsdbForceKeepFile;
@ -111,4 +115,8 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
return 0;
}
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_FS_H_ */

View File

@ -19,6 +19,10 @@
#include "tchecksum.h"
#include "tfs.h"
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
@ -410,4 +414,8 @@ static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
return true;
}
#ifdef __cplusplus
}
#endif
#endif /* _TS_TSDB_FILE_H_ */

View File

@ -18,6 +18,10 @@
#include "tlog.h"
#ifdef __cplusplus
extern "C" {
#endif
extern int32_t tsdbDebugFlag;
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
@ -27,4 +31,8 @@ extern int32_t tsdbDebugFlag;
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_LOG_H_ */

View File

@ -16,6 +16,10 @@
#ifndef _TD_TSDB_MEMORY_H_
#define _TD_TSDB_MEMORY_H_
#ifdef __cplusplus
extern "C" {
#endif
static void * taosTMalloc(size_t size);
static void * taosTCalloc(size_t nmemb, size_t size);
static void * taosTRealloc(void *ptr, size_t size);
@ -70,5 +74,8 @@ static FORCE_INLINE void* taosTZfree(void* ptr) {
return NULL;
}
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_MEMORY_H_ */

View File

@ -24,6 +24,10 @@
#include "tsdbMemory.h"
#include "tcommon.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SReadH SReadH;
typedef struct {
@ -244,4 +248,8 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
return 0;
}
#ifdef __cplusplus
}
#endif
#endif /*_TD_TSDB_READ_IMPL_H_*/

View File

@ -38,11 +38,14 @@ struct SMetaDB {
// DB
DB *pTbDB;
DB *pSchemaDB;
DB *pSmaDB;
// IDX
DB *pNameIdx;
DB *pStbIdx;
DB *pNtbIdx;
DB *pCtbIdx;
DB *pSmaIdx;
// ENV
DB_ENV *pEvn;
};
@ -61,6 +64,7 @@ static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT
static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static void metaClearTbCfg(STbCfg *pTbCfg);
@ -100,6 +104,11 @@ int metaOpenDB(SMeta *pMeta) {
return -1;
}
if (metaOpenBDBDb(&(pDB->pSmaDB), pDB->pEvn, "sma.db", false) < 0) {
metaCloseDB(pMeta);
return -1;
}
// Open Indices
if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "name.index", pDB->pTbDB, &metaNameIdxCb, false) < 0) {
metaCloseDB(pMeta);
@ -121,15 +130,22 @@ int metaOpenDB(SMeta *pMeta) {
return -1;
}
if (metaOpenBDBIdx(&(pDB->pSmaIdx), pDB->pEvn, "sma.index", pDB->pSmaDB, &metaSmaIdxCb, true) < 0) {
metaCloseDB(pMeta);
return -1;
}
return 0;
}
void metaCloseDB(SMeta *pMeta) {
if (pMeta->pDB) {
metaCloseBDBIdx(pMeta->pDB->pSmaIdx);
metaCloseBDBIdx(pMeta->pDB->pCtbIdx);
metaCloseBDBIdx(pMeta->pDB->pNtbIdx);
metaCloseBDBIdx(pMeta->pDB->pStbIdx);
metaCloseBDBIdx(pMeta->pDB->pNameIdx);
metaCloseBDBDb(pMeta->pDB->pSmaDB);
metaCloseBDBDb(pMeta->pDB->pSchemaDB);
metaCloseBDBDb(pMeta->pDB->pTbDB);
metaCloseBDBEnv(pMeta->pDB->pEvn);
@ -210,6 +226,49 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
return 0;
}
int metaSaveSmaToDB(SMeta *pMeta, SSmaCfg *pSmaCfg) {
char buf[512] = {0}; // TODO: may overflow
void *pBuf = NULL;
DBT key1 = {0}, value1 = {0};
{
// save sma info
pBuf = buf;
key1.data = pSmaCfg->indexName;
key1.size = strlen(key1.data);
tEncodeTSma(&pBuf, pSmaCfg);
value1.data = buf;
value1.size = POINTER_DISTANCE(pBuf, buf);
value1.app_data = pSmaCfg;
}
metaDBWLock(pMeta->pDB);
pMeta->pDB->pSmaDB->put(pMeta->pDB->pSmaDB, NULL, &key1, &value1, 0);
metaDBULock(pMeta->pDB);
return 0;
}
int metaRemoveSmaFromDb(SMeta *pMeta, const char *indexName) {
// TODO
#if 0
DBT key = {0};
key.data = (void *)indexName;
key.size = strlen(indexName);
metaDBWLock(pMeta->pDB);
// TODO: No guarantee of consistence.
// Use transaction or DB->sync() for some guarantee.
pMeta->pDB->pSmaDB->del(pMeta->pDB->pSmaDB, NULL, &key, 0);
metaDBULock(pMeta->pDB);
#endif
return 0;
}
/* ------------------------ STATIC METHODS ------------------------ */
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) {
int tlen = 0;
@ -425,6 +484,16 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey
}
}
static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
SSmaCfg *pSmaCfg = (SSmaCfg *)(pValue->app_data);
memset(pSKey, 0, sizeof(*pSKey));
pSKey->data = &(pSmaCfg->tableUid);
pSKey->size = sizeof(pSmaCfg->tableUid);
return 0;
}
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
int tsize = 0;
@ -540,6 +609,36 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
return pTbCfg;
}
SSmaCfg *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
SSmaCfg *pCfg = NULL;
SMetaDB *pDB = pMeta->pDB;
DBT key = {0};
DBT value = {0};
int ret;
// Set key/value
key.data = (void *)indexName;
key.size = strlen(indexName);
// Query
metaDBRLock(pDB);
ret = pDB->pTbDB->get(pDB->pSmaDB, NULL, &key, &value, 0);
metaDBULock(pDB);
if (ret != 0) {
return NULL;
}
// Decode
pCfg = (SSmaCfg *)malloc(sizeof(SSmaCfg));
if (pCfg == NULL) {
return NULL;
}
tDecodeTSma(value.data, pCfg);
return pCfg;
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
uint32_t nCols;
SSchemaWrapper *pSW = NULL;
@ -718,6 +817,61 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
}
}
struct SMSmaCursor {
DBC *pCur;
tb_uid_t uid;
};
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
SMSmaCursor *pCur = NULL;
SMetaDB *pDB = pMeta->pDB;
int ret;
pCur = (SMSmaCursor *)calloc(1, sizeof(*pCur));
if (pCur == NULL) {
return NULL;
}
pCur->uid = uid;
ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &(pCur->pCur), 0);
if (ret != 0) {
free(pCur);
return NULL;
}
return pCur;
}
void metaCloseSmaCurosr(SMSmaCursor *pCur) {
if (pCur) {
if (pCur->pCur) {
pCur->pCur->close(pCur->pCur);
}
free(pCur);
}
}
const char* metaSmaCursorNext(SMSmaCursor *pCur) {
DBT skey = {0};
DBT pkey = {0};
DBT pval = {0};
void *pBuf;
// Set key
skey.data = &(pCur->uid);
skey.size = sizeof(pCur->uid);
if (pCur->pCur->pget(pCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) {
const char* indexName = (const char *)pkey.data;
assert(indexName != NULL);
return indexName;
} else {
return 0;
}
}
static void metaDBWLock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
pthread_rwlock_wrlock(&(pDB->rwlock));

View File

@ -106,3 +106,20 @@ int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) {
// TODO
return 0;
}
int metaCreateSma(SMeta *pMeta, SSmaCfg *pSmaCfg) {
// Validate the tbOptions
// if (metaValidateTbCfg(pMeta, pTbCfg) < 0) {
// // TODO: handle error
// return -1;
// }
// TODO: add atomicity
if (metaSaveSmaToDB(pMeta, pSmaCfg) < 0) {
// TODO: handle error
return -1;
}
return 0;
}

View File

@ -32,6 +32,11 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->totalStorage = 300;
pLoad->compStorage = 200;
pLoad->pointsWritten = 100;
pLoad->numOfSelectReqs = 1;
pLoad->numOfInsertReqs = 3;
pLoad->numOfInsertSuccessReqs = 2;
pLoad->numOfBatchInsertReqs = 5;
pLoad->numOfBatchInsertSuccessReqs = 4;
return 0;
}

View File

@ -41,10 +41,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
return 0;
}
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVCreateTbReq vCreateTbReq;
SVCreateTbBatchReq vCreateTbBatchReq;
void *ptr = NULL;
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
void *ptr = NULL;
if (pVnode->config.streamMode == 0) {
ptr = vnodeMalloc(pVnode, pMsg->contLen);
@ -64,7 +62,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
switch (pMsg->msgType) {
case TDMT_VND_CREATE_STB:
case TDMT_VND_CREATE_STB: {
SVCreateTbReq vCreateTbReq = {0};
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
// TODO: handle error
@ -75,7 +74,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
free(vCreateTbReq.stbCfg.pTagSchema);
free(vCreateTbReq.name);
break;
case TDMT_VND_CREATE_TABLE:
}
case TDMT_VND_CREATE_TABLE: {
SVCreateTbBatchReq vCreateTbBatchReq = {0};
tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
@ -97,14 +98,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray));
taosArrayDestroy(vCreateTbBatchReq.pArray);
break;
case TDMT_VND_ALTER_STB:
}
case TDMT_VND_ALTER_STB: {
SVCreateTbReq vAlterTbReq = {0};
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
free(vCreateTbReq.stbCfg.pSchema);
free(vCreateTbReq.stbCfg.pTagSchema);
free(vCreateTbReq.name);
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
free(vAlterTbReq.stbCfg.pSchema);
free(vAlterTbReq.stbCfg.pTagSchema);
free(vAlterTbReq.name);
break;
}
case TDMT_VND_DROP_STB:
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
break;

View File

@ -1,20 +1,39 @@
add_executable(tqTest "")
target_sources(tqTest
PRIVATE
"tqMetaTest.cpp"
)
target_include_directories(tqTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/vnode/tq"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
MESSAGE(STATUS "vnode unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
# add_executable(tqTest "")
# target_sources(tqTest
# PRIVATE
# "tqMetaTest.cpp"
# )
# target_include_directories(tqTest
# PUBLIC
# "${CMAKE_SOURCE_DIR}/include/server/vnode/tq"
# "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
# )
# target_link_libraries(tqTest
# tq
# gtest_main
# )
# enable_testing()
# add_test(
# NAME tq_test
# COMMAND tqTest
# )
ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp)
TARGET_LINK_LIBRARIES(
tsdbSmaTest
PUBLIC os util common vnode gtest_main
)
target_link_libraries(tqTest
tq
gtest_main
)
enable_testing()
add_test(
NAME tq_test
COMMAND tqTest
)
TARGET_INCLUDE_DIRECTORIES(
tsdbSmaTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/common"
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../src/inc"
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)

View File

@ -0,0 +1,227 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#include <metaDef.h>
#include <tmsg.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, tSmaEncodeDecodeTest) {
// encode
STSma tSma = {0};
tSma.version = 0;
tSma.intervalUnit = TD_TIME_UNIT_DAY;
tSma.interval = 1;
tSma.slidingUnit = TD_TIME_UNIT_HOUR;
tSma.sliding = 0;
tstrncpy(tSma.indexName, "sma_index_test", TSDB_INDEX_NAME_LEN);
tSma.tableUid = 1234567890;
tSma.numOfColIds = 2;
tSma.numOfFuncIds = 5; // sum/min/max/avg/last
tSma.colIds = (col_id_t *)calloc(tSma.numOfColIds, sizeof(col_id_t));
tSma.funcIds = (uint16_t *)calloc(tSma.numOfFuncIds, sizeof(uint16_t));
for (int32_t i = 0; i < tSma.numOfColIds; ++i) {
*(tSma.colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
}
for (int32_t i = 0; i < tSma.numOfFuncIds; ++i) {
*(tSma.funcIds + i) = (i + 2);
}
STSmaWrapper tSmaWrapper = {.number = 1, .tSma = &tSma};
uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper);
void *buf = calloc(bufLen, 1);
assert(buf != NULL);
STSmaWrapper *pSW = (STSmaWrapper *)buf;
uint32_t len = tEncodeTSmaWrapper(&buf, &tSmaWrapper);
EXPECT_EQ(len, bufLen);
// decode
STSmaWrapper dstTSmaWrapper = {0};
void * result = tDecodeTSmaWrapper(pSW, &dstTSmaWrapper);
assert(result != NULL);
EXPECT_EQ(tSmaWrapper.number, dstTSmaWrapper.number);
for (int i = 0; i < tSmaWrapper.number; ++i) {
STSma *pSma = tSmaWrapper.tSma + i;
STSma *qSma = dstTSmaWrapper.tSma + i;
EXPECT_EQ(pSma->version, qSma->version);
EXPECT_EQ(pSma->intervalUnit, qSma->intervalUnit);
EXPECT_EQ(pSma->slidingUnit, qSma->slidingUnit);
EXPECT_STRCASEEQ(pSma->indexName, qSma->indexName);
EXPECT_EQ(pSma->numOfColIds, qSma->numOfColIds);
EXPECT_EQ(pSma->numOfFuncIds, qSma->numOfFuncIds);
EXPECT_EQ(pSma->tableUid, qSma->tableUid);
EXPECT_EQ(pSma->interval, qSma->interval);
EXPECT_EQ(pSma->sliding, qSma->sliding);
for (uint32_t j = 0; j < pSma->numOfColIds; ++j) {
EXPECT_EQ(*(col_id_t *)(pSma->colIds + j), *(col_id_t *)(qSma->colIds + j));
}
for (uint32_t j = 0; j < pSma->numOfFuncIds; ++j) {
EXPECT_EQ(*(uint16_t *)(pSma->funcIds + j), *(uint16_t *)(qSma->funcIds + j));
}
}
// resource release
tdDestroyTSma(&tSma, false);
tdDestroyTSmaWrapper(&dstTSmaWrapper);
}
TEST(testCase, tSma_DB_Put_Get_Del_Test) {
const char *smaIndexName1 = "sma_index_test_1";
const char *smaIndexName2 = "sma_index_test_2";
const char *smaTestDir = "./smaTest";
const uint64_t tbUid = 1234567890;
// encode
STSma tSma = {0};
tSma.version = 0;
tSma.intervalUnit = TD_TIME_UNIT_DAY;
tSma.interval = 1;
tSma.slidingUnit = TD_TIME_UNIT_HOUR;
tSma.sliding = 0;
tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN);
tSma.tableUid = tbUid;
tSma.numOfColIds = 2;
tSma.numOfFuncIds = 5; // sum/min/max/avg/last
tSma.colIds = (col_id_t *)calloc(tSma.numOfColIds, sizeof(col_id_t));
tSma.funcIds = (uint16_t *)calloc(tSma.numOfFuncIds, sizeof(uint16_t));
for (int32_t i = 0; i < tSma.numOfColIds; ++i) {
*(tSma.colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
}
for (int32_t i = 0; i < tSma.numOfFuncIds; ++i) {
*(tSma.funcIds + i) = (i + 2);
}
SMeta * pMeta = NULL;
SSmaCfg * pSmaCfg = &tSma;
const SMetaCfg *pMetaCfg = &defaultMetaOptions;
taosRemoveDir(smaTestDir);
pMeta = metaOpen(smaTestDir, pMetaCfg, NULL);
assert(pMeta != NULL);
// save index 1
metaSaveSmaToDB(pMeta, pSmaCfg);
tstrncpy(pSmaCfg->indexName, smaIndexName2, TSDB_INDEX_NAME_LEN);
pSmaCfg->version = 1;
pSmaCfg->intervalUnit = TD_TIME_UNIT_HOUR;
pSmaCfg->interval = 1;
pSmaCfg->slidingUnit = TD_TIME_UNIT_MINUTE;
pSmaCfg->sliding = 5;
// save index 2
metaSaveSmaToDB(pMeta, pSmaCfg);
// get value by indexName
SSmaCfg *qSmaCfg = NULL;
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName1);
assert(qSmaCfg != NULL);
printf("name1 = %s\n", qSmaCfg->indexName);
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid);
tdDestroyTSma(qSmaCfg, true);
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName2);
assert(qSmaCfg != NULL);
printf("name2 = %s\n", qSmaCfg->indexName);
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
EXPECT_EQ(qSmaCfg->interval, tSma.interval);
tdDestroyTSma(qSmaCfg, true);
// get value by table uid
SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid);
assert(pSmaCur != NULL);
uint32_t indexCnt = 0;
while (1) {
const char* indexName = metaSmaCursorNext(pSmaCur);
if (indexName == NULL) {
break;
}
printf("indexName = %s\n", indexName);
++indexCnt;
}
EXPECT_EQ(indexCnt, 2);
metaCloseSmaCurosr(pSmaCur);
// resource release
metaRemoveSmaFromDb(pMeta, smaIndexName1);
metaRemoveSmaFromDb(pMeta, smaIndexName2);
tdDestroyTSma(&tSma, false);
metaClose(pMeta);
}
#if 0
TEST(testCase, tSmaInsertTest) {
STSma tSma = {0};
STSmaData* pSmaData = NULL;
STsdb tsdb = {0};
// init
tSma.intervalUnit = TD_TIME_UNIT_DAY;
tSma.interval = 1;
tSma.numOfFuncIds = 5; // sum/min/max/avg/last
int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t);
int32_t numOfColIds = 3;
int32_t numOfSmaBlocks = 10;
int32_t dataLen = numOfColIds * numOfSmaBlocks * blockSize;
pSmaData = (STSmaData*)malloc(sizeof(STSmaData) + dataLen);
ASSERT_EQ(pSmaData != NULL, true);
pSmaData->tableUid = 3232329230;
pSmaData->numOfColIds = numOfColIds;
pSmaData->numOfSmaBlocks = numOfSmaBlocks;
pSmaData->dataLen = dataLen;
pSmaData->tsWindow.skey = 1640000000;
pSmaData->tsWindow.ekey = 1645788649;
pSmaData->colIds = (col_id_t*)malloc(sizeof(col_id_t) * numOfColIds);
ASSERT_EQ(pSmaData->colIds != NULL, true);
for (int32_t i = 0; i < numOfColIds; ++i) {
*(pSmaData->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
}
// execute
EXPECT_EQ(tsdbInsertTSmaData(&tsdb, &tSma, pSmaData), TSDB_CODE_SUCCESS);
// release
tdDestroySmaData(pSmaData);
}
#endif
#pragma GCC diagnostic pop

View File

@ -96,8 +96,10 @@ typedef struct SIndexTermQuery {
typedef struct Iterate Iterate;
typedef struct IterateValue {
int8_t type; // opera type, ADD_VALUE/DELETE_VALUE
char* colVal;
int8_t type; // opera type, ADD_VALUE/DELETE_VALUE
uint64_t ver; // data ver, tfile data version is 0
char* colVal;
SArray* val;
} IterateValue;

View File

@ -16,6 +16,7 @@
#define __INDEX_CACHE_H__
#include "indexInt.h"
#include "index_util.h"
#include "tskiplist.h"
// ----------------- key structure in skiplist ---------------------
@ -52,8 +53,9 @@ typedef struct CacheTerm {
char* colVal;
int32_t version;
// value
uint64_t uid;
int8_t colType;
uint64_t uid;
int8_t colType;
SIndexOperOnColumn operaType;
} CacheTerm;
//
@ -68,7 +70,7 @@ void indexCacheIteratorDestroy(Iterate* iiter);
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid);
// int indexCacheGet(void *cache, uint64_t *rst);
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s);
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s);
void indexCacheRef(IndexCache* cache);
void indexCacheUnRef(IndexCache* cache);

View File

@ -19,6 +19,7 @@
#include "index_fst.h"
#include "index_fst_counting_writer.h"
#include "index_tfile.h"
#include "index_util.h"
#include "tlockfree.h"
#ifdef __cplusplus
@ -103,7 +104,7 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx);
void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr);
void tfileReaderRef(TFileReader* reader);
void tfileReaderUnRef(TFileReader* reader);
@ -118,7 +119,7 @@ int tfileWriterFinish(TFileWriter* tw);
IndexTFile* indexTFileCreate(const char* path);
void indexTFileDestroy(IndexTFile* tfile);
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result);
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* tr);
Iterate* tfileIteratorCreate(TFileReader* reader);
void tfileIteratorDestroy(Iterate* iterator);

View File

@ -47,6 +47,19 @@ extern "C" {
buf += len; \
} while (0)
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
{ \
bool f = false; \
for (int i = 0; i < taosArrayGetSize(src); i++) { \
if (*(uint64_t *)taosArrayGet(src, i) == tgt) { \
f = true; \
} \
} \
if (f == false) { \
taosArrayPush(dst, &tgt); \
} \
}
/* multi sorted result intersection
* input: [1, 2, 4, 5]
* [2, 3, 4, 5]
@ -66,10 +79,32 @@ void iUnion(SArray *interResults, SArray *finalResult);
/* sorted array
* total: [1, 2, 4, 5, 7, 8]
* except: [4, 5]
* return: [1, 2, 7, 8]
* return: [1, 2, 7, 8] saved in total
*/
void iExcept(SArray *total, SArray *except);
int uidCompare(const void *a, const void *b);
// data with ver
typedef struct {
uint32_t ver;
uint64_t data;
} SIdxVerdata;
typedef struct {
SArray *total;
SArray *added;
SArray *deled;
} SIdxTempResult;
SIdxTempResult *sIdxTempResultCreate();
void sIdxTempResultClear(SIdxTempResult *tr);
void sIdxTempResultDestroy(SIdxTempResult *tr);
void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr);
#ifdef __cplusplus
}
#endif

View File

@ -31,18 +31,6 @@
void* indexQhandle = NULL;
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
{ \
bool f = false; \
for (int i = 0; i < taosArrayGetSize(src); i++) { \
if (*(uint64_t*)taosArrayGet(src, i) == tgt) { \
f = true; \
} \
} \
if (f == false) { \
taosArrayPush(dst, &tgt); \
} \
}
void indexInit() {
// refactor later
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
@ -52,23 +40,11 @@ void indexCleanUp() {
taosCleanUpScheduler(indexQhandle);
}
static int uidCompare(const void* a, const void* b) {
// add more version compare
uint64_t u1 = *(uint64_t*)a;
uint64_t u2 = *(uint64_t*)b;
return u1 - u2;
}
typedef struct SIdxColInfo {
int colId; // generated by index internal
int cVersion;
} SIdxColInfo;
typedef struct SIdxTempResult {
SArray* total;
SArray* added;
SArray* deled;
} SIdxTempResult;
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
// static void indexInit();
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
@ -255,6 +231,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
#ifdef USE_INVERTED_INDEX
#endif
return 1;
@ -363,22 +340,30 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
*result = taosArrayInit(4, sizeof(uint64_t));
// TODO: iterator mem and tidex
STermValueType s = kTypeValue;
if (0 == indexCacheSearch(cache, query, *result, &s)) {
SIdxTempResult* tr = sIdxTempResultCreate();
if (0 == indexCacheSearch(cache, query, tr, &s)) {
if (s == kTypeDeletion) {
indexInfo("col: %s already drop by", term->colName);
// coloum already drop by other oper, no need to query tindex
return 0;
} else {
if (0 != indexTFileSearch(sIdx->tindex, query, *result)) {
if (0 != indexTFileSearch(sIdx->tindex, query, tr)) {
indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
return -1;
goto END;
}
}
} else {
indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
return -1;
goto END;
}
sIdxTempResultMergeTo(*result, tr);
sIdxTempResultDestroy(tr);
return 0;
END:
sIdxTempResultDestroy(tr);
return -1;
}
static void indexInterResultsDestroy(SArray* results) {
if (results == NULL) {
@ -413,43 +398,6 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
return 0;
}
SIdxTempResult* sIdxTempResultCreate() {
SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult));
tr->total = taosArrayInit(4, sizeof(uint64_t));
tr->added = taosArrayInit(4, sizeof(uint64_t));
tr->deled = taosArrayInit(4, sizeof(uint64_t));
return tr;
}
void sIdxTempResultClear(SIdxTempResult* tr) {
if (tr == NULL) {
return;
}
taosArrayClear(tr->total);
taosArrayClear(tr->added);
taosArrayClear(tr->deled);
}
void sIdxTempResultDestroy(SIdxTempResult* tr) {
if (tr == NULL) {
return;
}
taosArrayDestroy(tr->total);
taosArrayDestroy(tr->added);
taosArrayDestroy(tr->deled);
}
static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) {
taosArraySort(tr->total, uidCompare);
taosArraySort(tr->added, uidCompare);
taosArraySort(tr->deled, uidCompare);
SArray* arrs = taosArrayInit(2, sizeof(void*));
taosArrayPush(arrs, &tr->total);
taosArrayPush(arrs, &tr->added);
iUnion(arrs, result);
taosArrayDestroy(arrs);
iExcept(result, tr->deled);
}
static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) {
int32_t sz = taosArrayGetSize(result);
if (sz > 0) {
@ -478,6 +426,7 @@ static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateVal
if (cv != NULL) {
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
uint32_t ver = cv->ver;
if (cv->type == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id)
} else if (cv->type == DEL_VALUE) {

View File

@ -256,7 +256,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
return 0;
}
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) {
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SIdxTempResult* tr, STermValueType* s) {
if (mem == NULL) {
return 0;
}
@ -267,28 +267,23 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
SSkipListNode* node = tSkipListIterGet(iter);
if (node != NULL) {
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
// if (c->operaType == ADD_VALUE) {
//} else if (c->operaType == DEL_VALUE) {
//}
if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
if (strcmp(c->colVal, ct->colVal) == 0) {
taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else {
break;
if (qtype == QUERY_TERM) {
if (0 == strcmp(c->colVal, ct->colVal)) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
// taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
}
}
} else if (c->operaType == DEL_VALUE) {
// table is del, not need
*s = kTypeDeletion;
break;
}
}
}
tSkipListDestroyIter(iter);
return 0;
}
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
if (cache == NULL) {
return 0;
}
@ -416,6 +411,7 @@ static bool indexCacheIteratorNext(Iterate* itera) {
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
iv->type = ct->operaType;
iv->ver = ct->version;
iv->colVal = tstrdup(ct->colVal);
taosArrayPush(iv->val, &ct->uid);

View File

@ -184,12 +184,13 @@ void tfileReaderDestroy(TFileReader* reader) {
free(reader);
}
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) {
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) {
SIndexTerm* term = query->term;
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
EIndexQueryType qtype = query->qType;
int ret = -1;
SArray* result = taosArrayInit(16, sizeof(uint64_t));
int ret = -1;
// refactor to callback later
if (qtype == QUERY_TERM) {
uint64_t offset;
@ -223,6 +224,10 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
// handle later
}
tfileReaderUnRef(reader);
taosArrayAddAll(tr->total, result);
taosArrayDestroy(result);
return ret;
}
@ -248,7 +253,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
tfileGenFileFullName(fullname, path, suid, colName, version);
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size);
indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
if (wc == NULL) {
return NULL;
}
@ -380,7 +385,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
free(tfile);
}
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result) {
int ret = -1;
if (tfile == NULL) {
return ret;
@ -428,6 +433,7 @@ static bool tfileIteratorNext(Iterate* iiter) {
return false;
}
iv->ver = 0;
iv->type = ADD_VALUE; // value in tfile always ADD_VALUE
iv->colVal = colVal;
return true;
@ -628,7 +634,7 @@ static int tfileReaderLoadFst(TFileReader* reader) {
int64_t ts = taosGetTimestampUs();
int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset);
int64_t cost = taosGetTimestampUs() - ts;
indexInfo("nread = %d, and fst offset=%d, size: %d, filename: %s, size: %d, time cost: %" PRId64 "us", nread,
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread,
reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
// we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread <= fstSize);

View File

@ -14,6 +14,7 @@
*/
#include "index_util.h"
#include "index.h"
#include "tcompare.h"
typedef struct MergeIndex {
int idx;
@ -135,3 +136,60 @@ void iExcept(SArray *total, SArray *except) {
taosArrayPopTailBatch(total, tsz - vIdx);
}
int uidCompare(const void *a, const void *b) {
// add more version compare
uint64_t u1 = *(uint64_t *)a;
uint64_t u2 = *(uint64_t *)b;
return u1 - u2;
}
int verdataCompare(const void *a, const void *b) {
SIdxVerdata *va = (SIdxVerdata *)a;
SIdxVerdata *vb = (SIdxVerdata *)b;
int32_t cmp = compareUint64Val(&va->data, &vb->data);
if (cmp == 0) {
cmp = 0 - compareUint32Val(&va->ver, &vb->data);
return cmp;
}
return cmp;
}
SIdxTempResult *sIdxTempResultCreate() {
SIdxTempResult *tr = calloc(1, sizeof(SIdxTempResult));
tr->total = taosArrayInit(4, sizeof(uint64_t));
tr->added = taosArrayInit(4, sizeof(uint64_t));
tr->deled = taosArrayInit(4, sizeof(uint64_t));
return tr;
}
void sIdxTempResultClear(SIdxTempResult *tr) {
if (tr == NULL) {
return;
}
taosArrayClear(tr->total);
taosArrayClear(tr->added);
taosArrayClear(tr->deled);
}
void sIdxTempResultDestroy(SIdxTempResult *tr) {
if (tr == NULL) {
return;
}
taosArrayDestroy(tr->total);
taosArrayDestroy(tr->added);
taosArrayDestroy(tr->deled);
}
void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr) {
taosArraySort(tr->total, uidCompare);
taosArraySort(tr->added, uidCompare);
taosArraySort(tr->deled, uidCompare);
SArray *arrs = taosArrayInit(2, sizeof(void *));
taosArrayPush(arrs, &tr->total);
taosArrayPush(arrs, &tr->added);
iUnion(arrs, result);
taosArrayDestroy(arrs);
iExcept(result, tr->deled);
}

View File

@ -24,6 +24,7 @@
#include "index_fst_counting_writer.h"
#include "index_fst_util.h"
#include "index_tfile.h"
#include "index_util.h"
#include "tskiplist.h"
#include "tutil.h"
using namespace std;
@ -393,7 +394,13 @@ class TFileObj {
//
//
}
return tfileReaderSearch(reader_, query, result);
SIdxTempResult* tr = sIdxTempResultCreate();
int ret = tfileReaderSearch(reader_, query, tr);
sIdxTempResultMergeTo(result, tr);
sIdxTempResultDestroy(tr);
return ret;
}
~TFileObj() {
if (writer_) {
@ -507,9 +514,13 @@ class CacheObj {
indexCacheDebug(cache);
}
int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) {
int ret = indexCacheSearch(cache, query, result, s);
SIdxTempResult* tr = sIdxTempResultCreate();
int ret = indexCacheSearch(cache, query, tr, s);
sIdxTempResultMergeTo(result, tr);
sIdxTempResultDestroy(tr);
if (ret != 0) {
//
std::cout << "failed to get from cache:" << ret << std::endl;
}
return ret;
@ -649,7 +660,7 @@ class IndexObj {
indexInit();
}
int Init(const std::string& dir) {
// taosRemoveDir(dir.c_str());
taosRemoveDir(dir.c_str());
taosMkDir(dir.c_str());
int ret = indexOpen(&opts, dir.c_str(), &idx);
if (ret != 0) {
@ -658,6 +669,14 @@ class IndexObj {
}
return ret;
}
void Del(const std::string& colName, const std::string& colVal, uint64_t uid) {
SIndexTerm* term = indexTermCreate(0, DEL_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
Put(terms, uid);
indexMultiTermDestroy(terms);
}
int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world",
size_t numOfTable = 100 * 10000) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
@ -730,6 +749,7 @@ class IndexObj {
std::cout << "search and time cost:" << e - s << "\tquery col:" << colName << "\t val: " << colVal
<< "\t size:" << taosArrayGetSize(result) << std::endl;
} else {
return -1;
}
int sz = taosArrayGetSize(result);
indexMultiTermQueryDestroy(mq);
@ -797,13 +817,9 @@ class IndexObj {
class IndexEnv2 : public ::testing::Test {
protected:
virtual void SetUp() {
index = new IndexObj();
}
virtual void TearDown() {
delete index;
}
IndexObj* index;
virtual void SetUp() { index = new IndexObj(); }
virtual void TearDown() { delete index; }
IndexObj* index;
};
TEST_F(IndexEnv2, testIndexOpen) {
std::string path = "/tmp/test";
@ -1042,3 +1058,19 @@ TEST_F(IndexEnv2, testIndex_read_performance4) {
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
assert(3 == index->SearchOne("tag10", "Hello"));
}
TEST_F(IndexEnv2, testIndex_del) {
std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {
}
for (int i = 0; i < 100; i++) {
index->PutOneTarge("tag10", "Hello", i);
}
index->Del("tag10", "Hello", 12);
index->Del("tag10", "Hello", 11);
index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000);
EXPECT_EQ(98, index->SearchOne("tag10", "Hello"));
// std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
// assert(3 == index->SearchOne("tag10", "Hello"));
}

View File

@ -28,9 +28,24 @@ typedef struct {
char content[MON_LOG_LEN];
} SMonLogItem;
typedef struct {
int64_t time;
int64_t req_select;
int64_t req_insert;
int64_t req_insert_batch;
int64_t net_in;
int64_t net_out;
int64_t io_read;
int64_t io_write;
int64_t io_read_disk;
int64_t io_write_disk;
} SMonState;
typedef struct SMonInfo {
SArray *logs; // array of SMonLogItem
SJson *pJson;
int64_t curTime;
SMonState lastState;
SArray *logs; // array of SMonLogItem
SJson *pJson;
} SMonInfo;
typedef struct {
@ -39,6 +54,7 @@ typedef struct {
int32_t maxLogs;
const char *server;
uint16_t port;
SMonState state;
} SMonitor;
#ifdef __cplusplus

View File

@ -46,6 +46,7 @@ int32_t monInit(const SMonCfg *pCfg) {
tsMonitor.server = pCfg->server;
tsMonitor.port = pCfg->port;
tsLogFp = monRecordLog;
tsMonitor.state.time = taosGetTimestampMs();
pthread_mutex_init(&tsMonitor.lock, NULL);
return 0;
}
@ -76,20 +77,23 @@ SMonInfo *monCreateMonitorInfo() {
return NULL;
}
pMonitor->curTime = taosGetTimestampMs();
pMonitor->lastState = tsMonitor.state;
return pMonitor;
}
void monCleanupMonitorInfo(SMonInfo *pMonitor) {
tsMonitor.state = pMonitor->lastState;
tsMonitor.state.time = pMonitor->curTime;
taosArrayDestroy(pMonitor->logs);
tjsonDelete(pMonitor->pJson);
free(pMonitor);
}
void monSetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo) {
SJson *pJson = pMonitor->pJson;
int64_t ms = taosGetTimestampMs();
char buf[40] = {0};
taosFormatUtcTime(buf, sizeof(buf), ms, TSDB_TIME_PRECISION_MILLI);
SJson *pJson = pMonitor->pJson;
char buf[40] = {0};
taosFormatUtcTime(buf, sizeof(buf), pMonitor->curTime, TSDB_TIME_PRECISION_MILLI);
tjsonAddStringToObject(pJson, "ts", buf);
tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id);
@ -203,6 +207,27 @@ void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) {
return;
}
SMonState *pLast = &pMonitor->lastState;
double interval = (pMonitor->curTime - pLast->time) / 1000.0;
double req_select_rate = (pInfo->req_select - pLast->req_select) / interval;
double req_insert_rate = (pInfo->req_insert - pLast->req_insert) / interval;
double req_insert_batch_rate = (pInfo->req_insert_batch - pLast->req_insert_batch) / interval;
double net_in_rate = (pInfo->net_in - pLast->net_in) / interval;
double net_out_rate = (pInfo->net_out - pLast->net_out) / interval;
double io_read_rate = (pInfo->io_read - pLast->io_read) / interval;
double io_write_rate = (pInfo->io_write - pLast->io_write) / interval;
double io_read_disk_rate = (pInfo->io_read_disk - pLast->io_read_disk) / interval;
double io_write_disk_rate = (pInfo->io_write_disk - pLast->io_write_disk) / interval;
pLast->req_select = pInfo->req_select;
pLast->req_insert = pInfo->req_insert;
pLast->req_insert_batch = pInfo->req_insert_batch;
pLast->net_in = pInfo->net_in;
pLast->net_out = pInfo->net_out;
pLast->io_read = pInfo->io_read;
pLast->io_write = pInfo->io_write;
pLast->io_read_disk = pInfo->io_read_disk;
pLast->io_write_disk = pInfo->io_write_disk;
tjsonAddDoubleToObject(pJson, "uptime", pInfo->uptime);
tjsonAddDoubleToObject(pJson, "cpu_engine", pInfo->cpu_engine);
tjsonAddDoubleToObject(pJson, "cpu_system", pInfo->cpu_system);
@ -213,20 +238,20 @@ void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) {
tjsonAddDoubleToObject(pJson, "disk_engine", pInfo->disk_engine);
tjsonAddDoubleToObject(pJson, "disk_used", pInfo->disk_used);
tjsonAddDoubleToObject(pJson, "disk_total", pInfo->disk_total);
tjsonAddDoubleToObject(pJson, "net_in", pInfo->net_in);
tjsonAddDoubleToObject(pJson, "net_out", pInfo->net_out);
tjsonAddDoubleToObject(pJson, "io_read", pInfo->io_read);
tjsonAddDoubleToObject(pJson, "io_write", pInfo->io_write);
tjsonAddDoubleToObject(pJson, "io_read_disk", pInfo->io_read_disk);
tjsonAddDoubleToObject(pJson, "io_write_disk", pInfo->io_write_disk);
tjsonAddDoubleToObject(pJson, "net_in", net_in_rate);
tjsonAddDoubleToObject(pJson, "net_out", net_out_rate);
tjsonAddDoubleToObject(pJson, "io_read", io_read_rate);
tjsonAddDoubleToObject(pJson, "io_write", io_write_rate);
tjsonAddDoubleToObject(pJson, "io_read_disk", io_read_disk_rate);
tjsonAddDoubleToObject(pJson, "io_write_disk", io_write_disk_rate);
tjsonAddDoubleToObject(pJson, "req_select", pInfo->req_select);
tjsonAddDoubleToObject(pJson, "req_select_rate", pInfo->req_select_rate);
tjsonAddDoubleToObject(pJson, "req_select_rate", req_select_rate);
tjsonAddDoubleToObject(pJson, "req_insert", pInfo->req_insert);
tjsonAddDoubleToObject(pJson, "req_insert_success", pInfo->req_insert_success);
tjsonAddDoubleToObject(pJson, "req_insert_rate", pInfo->req_insert_rate);
tjsonAddDoubleToObject(pJson, "req_insert_rate", req_insert_rate);
tjsonAddDoubleToObject(pJson, "req_insert_batch", pInfo->req_insert_batch);
tjsonAddDoubleToObject(pJson, "req_insert_batch_success", pInfo->req_insert_batch_success);
tjsonAddDoubleToObject(pJson, "req_insert_batch_rate", pInfo->req_insert_batch_rate);
tjsonAddDoubleToObject(pJson, "req_insert_batch_rate", req_insert_batch_rate);
tjsonAddDoubleToObject(pJson, "errors", pInfo->errors);
tjsonAddDoubleToObject(pJson, "vnodes_num", pInfo->vnodes_num);
tjsonAddDoubleToObject(pJson, "masters", pInfo->masters);

View File

@ -142,13 +142,10 @@ void MonitorTest::GetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) {
pInfo->io_read_disk = 7.1;
pInfo->io_write_disk = 7.2;
pInfo->req_select = 8;
pInfo->req_select_rate = 8.1;
pInfo->req_insert = 9;
pInfo->req_insert_success = 10;
pInfo->req_insert_rate = 10.1;
pInfo->req_insert_batch = 11;
pInfo->req_insert_batch_success = 12;
pInfo->req_insert_batch_rate = 12.3;
pInfo->errors = 4;
pInfo->vnodes_num = 5;
pInfo->masters = 6;

View File

@ -118,7 +118,7 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) {
return 0;
}
int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) {
int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) {
IO_COUNTERS io_counter;
if (GetProcessIoCounters(GetCurrentProcess(), &io_counter)) {
if (rchars) *rchars = io_counter.ReadTransferCount;
@ -135,9 +135,7 @@ void taosGetSystemInfo() {
taosGetTotalMemory(&tsTotalMemoryKB);
double tmp1, tmp2, tmp3, tmp4;
taosGetBandSpeed(&tmp1, &tmp2);
taosGetCpuUsage(&tmp1, &tmp2);
taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4);
}
void taosKillSystem() {
@ -227,7 +225,7 @@ void taosGetSystemInfo() {
tsNumOfCores = sysconf(_SC_NPROCESSORS_ONLN);
}
int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) {
int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) {
if (rchars) *rchars = 0;
if (wchars) *wchars = 0;
if (read_bytes) *read_bytes = 0;
@ -336,7 +334,7 @@ static char tsProcCpuFile[25] = {0};
static char tsProcMemFile[25] = {0};
static char tsProcIOFile[25] = {0};
static void taosGetProcInfos() {
static void taosGetProcIOnfos() {
tsPageSizeKB = sysconf(_SC_PAGESIZE) / 1024;
tsOpenMax = sysconf(_SC_OPEN_MAX);
tsStreamMax = sysconf(_SC_STREAM_MAX);
@ -544,41 +542,7 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) {
return 0;
}
int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec) {
static int64_t last_receive_bytes = 0;
static int64_t last_transmit_bytes = 0;
static int64_t last_time = 0;
int64_t cur_receive_bytes = 0;
int64_t cur_transmit_bytes = 0;
int64_t cur_time = taosGetTimestampMs();
if (taosGetCardInfo(&cur_receive_bytes, &cur_transmit_bytes) != 0) {
return -1;
}
if (last_time == 0 || last_time >= cur_time) {
last_time = cur_time;
last_receive_bytes = cur_receive_bytes;
last_transmit_bytes = cur_transmit_bytes;
*receive_bytes_per_sec = 0;
*transmit_bytes_per_sec = 0;
return 0;
}
*receive_bytes_per_sec = (cur_receive_bytes - last_receive_bytes) / (double)(cur_time - last_time) * 1000;
*transmit_bytes_per_sec = (cur_transmit_bytes - last_transmit_bytes) / (double)(cur_time - last_time) * 1000;
last_time = cur_time;
last_transmit_bytes = cur_transmit_bytes;
last_receive_bytes = cur_receive_bytes;
if (*receive_bytes_per_sec < 0) *receive_bytes_per_sec = 0;
if (*transmit_bytes_per_sec < 0) *transmit_bytes_per_sec = 0;
return 0;
}
int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) {
int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) {
TdFilePtr pFile = taosOpenFile(tsProcIOFile, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) return -1;
@ -620,61 +584,13 @@ int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, in
return 0;
}
int32_t taosGetIOSpeed(double *rchar_per_sec, double *wchar_per_sec, double *read_bytes_per_sec,
double *write_bytes_per_sec) {
static int64_t last_rchar = -1;
static int64_t last_wchar = -1;
static int64_t last_read_bytes = -1;
static int64_t last_write_bytes = -1;
static int64_t last_time = 0;
int64_t cur_rchar = 0;
int64_t cur_wchar = 0;
int64_t cur_read_bytes = 0;
int64_t cur_write_bytes = 0;
int64_t cur_time = taosGetTimestampMs();
if (taosReadProcIO(&cur_rchar, &cur_wchar, &cur_read_bytes, &cur_write_bytes) != 0) {
return -1;
}
if (last_time == 0 || last_time >= cur_time) {
last_time = cur_time;
last_rchar = cur_rchar;
last_wchar = cur_wchar;
last_read_bytes = cur_read_bytes;
last_write_bytes = cur_write_bytes;
return -1;
}
*rchar_per_sec = (cur_rchar - last_rchar) / (double)(cur_time - last_time) * 1000;
*wchar_per_sec = (cur_wchar - last_wchar) / (double)(cur_time - last_time) * 1000;
*read_bytes_per_sec = (cur_read_bytes - last_read_bytes) / (double)(cur_time - last_time) * 1000;
*write_bytes_per_sec = (cur_write_bytes - last_write_bytes) / (double)(cur_time - last_time) * 1000;
last_time = cur_time;
last_rchar = cur_rchar;
last_wchar = cur_wchar;
last_read_bytes = cur_read_bytes;
last_write_bytes = cur_write_bytes;
if (*rchar_per_sec < 0) *rchar_per_sec = 0;
if (*wchar_per_sec < 0) *wchar_per_sec = 0;
if (*read_bytes_per_sec < 0) *read_bytes_per_sec = 0;
if (*write_bytes_per_sec < 0) *write_bytes_per_sec = 0;
return 0;
}
void taosGetSystemInfo() {
taosGetProcInfos();
taosGetProcIOnfos();
taosGetCpuCores(&tsNumOfCores);
taosGetTotalMemory(&tsTotalMemoryKB);
double tmp1, tmp2, tmp3, tmp4;
taosGetBandSpeed(&tmp1, &tmp2);
taosGetCpuUsage(&tmp1, &tmp2);
taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4);
}
void taosKillSystem() {