diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1b08d6f241..d1e02af287 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -671,6 +671,11 @@ typedef struct { int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; + int64_t numOfSelectReqs; + int64_t numOfInsertReqs; + int64_t numOfInsertSuccessReqs; + int64_t numOfBatchInsertReqs; + int64_t numOfBatchInsertSuccessReqs; } SVnodeLoad; typedef struct { @@ -1837,6 +1842,149 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) } return buf; } +typedef enum { + TD_TIME_UNIT_UNKNOWN = -1, + TD_TIME_UNIT_YEAR = 0, + TD_TIME_UNIT_SEASON = 1, + TD_TIME_UNIT_MONTH = 2, + TD_TIME_UNIT_WEEK = 3, + TD_TIME_UNIT_DAY = 4, + TD_TIME_UNIT_HOUR = 5, + TD_TIME_UNIT_MINUTE = 6, + TD_TIME_UNIT_SEC = 7, + TD_TIME_UNIT_MILLISEC = 8, + TD_TIME_UNIT_MICROSEC = 9, + TD_TIME_UNIT_NANOSEC = 10 +} ETDTimeUnit; +typedef struct { + uint8_t version; // for compatibility + uint8_t intervalUnit; + uint8_t slidingUnit; + char indexName[TSDB_INDEX_NAME_LEN + 1]; + col_id_t numOfColIds; + uint16_t numOfFuncIds; + uint64_t tableUid; // super/common table uid + int64_t interval; + int64_t sliding; + col_id_t* colIds; // N.B. sorted column ids + uint16_t* funcIds; // N.B. sorted sma function ids +} STSma; // Time-range-wise SMA + +typedef struct { + uint32_t number; + STSma* tSma; +} STSmaWrapper; + +static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) { + if (pSma) { + tfree(pSma->colIds); + tfree(pSma->funcIds); + if (releaseSelf) { + free(pSma); + } + } +} + +static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) { + if (pSW && pSW->tSma) { + for (uint32_t i = 0; i < pSW->number; ++i) { + tdDestroyTSma(pSW->tSma + i, false); + } + tfree(pSW->tSma); + } +} + +static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) { + int32_t tlen = 0; + + tlen += taosEncodeFixedU8(buf, pSma->version); + tlen += taosEncodeFixedU8(buf, pSma->intervalUnit); + tlen += taosEncodeFixedU8(buf, pSma->slidingUnit); + tlen += taosEncodeString(buf, pSma->indexName); + tlen += taosEncodeFixedU16(buf, pSma->numOfColIds); + tlen += taosEncodeFixedU16(buf, pSma->numOfFuncIds); + tlen += taosEncodeFixedU64(buf, pSma->tableUid); + tlen += taosEncodeFixedI64(buf, pSma->interval); + tlen += taosEncodeFixedI64(buf, pSma->sliding); + + for (col_id_t i = 0; i < pSma->numOfColIds; ++i) { + tlen += taosEncodeFixedU16(buf, *(pSma->colIds + i)); + } + + for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) { + tlen += taosEncodeFixedU16(buf, *(pSma->funcIds + i)); + } + + return tlen; +} + +static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* pSW) { + int32_t tlen = 0; + + tlen += taosEncodeFixedU32(buf, pSW->number); + for (uint32_t i = 0; i < pSW->number; ++i) { + tlen += tEncodeTSma(buf, pSW->tSma + i); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) { + buf = taosDecodeFixedU8(buf, &pSma->version); + buf = taosDecodeFixedU8(buf, &pSma->intervalUnit); + buf = taosDecodeFixedU8(buf, &pSma->slidingUnit); + buf = taosDecodeStringTo(buf, pSma->indexName); + buf = taosDecodeFixedU16(buf, &pSma->numOfColIds); + buf = taosDecodeFixedU16(buf, &pSma->numOfFuncIds); + buf = taosDecodeFixedU64(buf, &pSma->tableUid); + buf = taosDecodeFixedI64(buf, &pSma->interval); + buf = taosDecodeFixedI64(buf, &pSma->sliding); + + if (pSma->numOfColIds > 0) { + pSma->colIds = (col_id_t*)calloc(pSma->numOfColIds, sizeof(STSma)); + if (pSma->colIds == NULL) { + return NULL; + } + for (uint16_t i = 0; i < pSma->numOfColIds; ++i) { + buf = taosDecodeFixedU16(buf, pSma->colIds + i); + } + } else { + pSma->colIds = NULL; + } + + if (pSma->numOfFuncIds > 0) { + pSma->funcIds = (uint16_t*)calloc(pSma->numOfFuncIds, sizeof(STSma)); + if (pSma->funcIds == NULL) { + return NULL; + } + for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) { + buf = taosDecodeFixedU16(buf, pSma->funcIds + i); + } + } else { + pSma->funcIds = NULL; + } + + return buf; +} + +static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) { + buf = taosDecodeFixedU32(buf, &pSW->number); + + pSW->tSma = (STSma*)calloc(pSW->number, sizeof(STSma)); + if (pSW->tSma == NULL) { + return NULL; + } + + for (uint32_t i = 0; i < pSW->number; ++i) { + if ((buf = tDecodeTSma(buf, pSW->tSma + i)) == NULL) { + for (uint32_t j = i; j >= 0; --i) { + tdDestroyTSma(pSW->tSma + j, false); + } + free(pSW->tSma); + return NULL; + } + } + return buf; +} typedef struct { int64_t uid; diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 1695edd983..0c832f802b 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -95,20 +95,17 @@ typedef struct { int64_t disk_engine; // Byte int64_t disk_used; // Byte int64_t disk_total; // Byte - double net_in; // bytes per second - double net_out; // bytes per second - double io_read; - double io_write; - double io_read_disk; - double io_write_disk; - int32_t req_select; - float req_select_rate; - int32_t req_insert; - int32_t req_insert_success; - float req_insert_rate; - int32_t req_insert_batch; - int32_t req_insert_batch_success; - float req_insert_batch_rate; + int64_t net_in; // bytes + int64_t net_out; // bytes + int64_t io_read; // bytes + int64_t io_write; // bytes + int64_t io_read_disk; // bytes + int64_t io_write_disk; // bytes + int64_t req_select; + int64_t req_insert; + int64_t req_insert_success; + int64_t req_insert_batch; + int64_t req_insert_batch_success; int32_t errors; int32_t vnodes_num; int32_t masters; diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 9081fa9715..1ebad370b5 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -43,10 +43,8 @@ int32_t taosGetTotalMemory(int64_t *totalKB); int32_t taosGetProcMemory(int64_t *usedKB); int32_t taosGetSysMemory(int64_t *usedKB); int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize); -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes); -int32_t taosGetIOSpeed(double *readKB, double *writeKB, double *readDiskKB, double *writeDiskKB); +int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes); int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes); -int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec); int32_t taosSystem(const char *cmd); void taosKillSystem(); diff --git a/include/util/tdef.h b/include/util/tdef.h index a1b4fc11cf..9695c2e4c8 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -206,6 +206,7 @@ typedef enum ELogicConditionType { #define TSDB_FUNC_TYPE_AGGREGATE 2 #define TSDB_FUNC_MAX_RETRIEVE 1024 +#define TSDB_INDEX_NAME_LEN 32 #define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index b9acbea02f..13ef101908 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -105,10 +105,19 @@ typedef struct { } SBnodeMgmt; typedef struct { + int32_t openVnodes; + int32_t totalVnodes; + int32_t masterNum; + int64_t numOfSelectReqs; + int64_t numOfInsertReqs; + int64_t numOfInsertSuccessReqs; + int64_t numOfBatchInsertReqs; + int64_t numOfBatchInsertSuccessReqs; +} SVnodesStat; + +typedef struct { + SVnodesStat stat; SHashObj *hash; - int32_t openVnodes; - int32_t totalVnodes; - int32_t masterNum; SRWLatch latch; SQWorkerPool queryPool; SFWorkerPool fetchPool; diff --git a/source/dnode/mgmt/impl/inc/dndMnode.h b/source/dnode/mgmt/impl/inc/dndMnode.h index 0aee3b4b43..0f03bb3832 100644 --- a/source/dnode/mgmt/impl/inc/dndMnode.h +++ b/source/dnode/mgmt/impl/inc/dndMnode.h @@ -34,7 +34,6 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonGrantInfo *pGrantInfo); -int8_t dndIsMnode(SDnode *pDnode); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index ff9eb5d884..1782b2aba0 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -489,20 +489,19 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { pInfo->disk_engine = 0; pInfo->disk_used = tsDataSpace.size.used; pInfo->disk_total = tsDataSpace.size.total; - taosGetBandSpeed(&pInfo->net_in, &pInfo->net_out); - taosGetIOSpeed(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); - pInfo->req_select = 0; - pInfo->req_select_rate = 0; - pInfo->req_insert = 0; - pInfo->req_insert_success = 0; - pInfo->req_insert_rate = 0; - pInfo->req_insert_batch = 0; - pInfo->req_insert_batch_success = 0; - pInfo->req_insert_batch_rate = 0; + taosGetCardInfo(&pInfo->net_in, &pInfo->net_out); + taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); + + SVnodesStat *pStat = &pDnode->vmgmt.stat; + pInfo->req_select = pStat->numOfSelectReqs; + pInfo->req_insert = pStat->numOfInsertReqs; + pInfo->req_insert_success = pStat->numOfInsertSuccessReqs; + pInfo->req_insert_batch = pStat->numOfBatchInsertReqs; + pInfo->req_insert_batch_success = pStat->numOfBatchInsertSuccessReqs; pInfo->errors = tsNumOfErrorLogs; - pInfo->vnodes_num = pDnode->vmgmt.totalVnodes; - pInfo->masters = pDnode->vmgmt.masterNum; - pInfo->has_mnode = dndIsMnode(pDnode); + pInfo->vnodes_num = pStat->totalVnodes; + pInfo->masters = pStat->masterNum; + pInfo->has_mnode = pDnode->mmgmt.deployed; } static void dndSendMonitorReport(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 47e74b5c57..0ea47c89d8 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -640,10 +640,3 @@ int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SM dndReleaseMnode(pDnode, pMnode); return code; } - -int8_t dndIsMnode(SDnode *pDnode) { - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode == NULL) return 0; - dndReleaseMnode(pDnode, pMnode); - return 1; -} \ No newline at end of file diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 28bc615aba..d311e1e417 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -382,7 +382,7 @@ static void *dnodeOpenVnodeFunc(void *param) { char stepDesc[TSDB_STEP_DESC_LEN] = {0}; snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId, - pMgmt->openVnodes, pMgmt->totalVnodes); + pMgmt->stat.openVnodes, pMgmt->stat.totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; @@ -396,7 +396,7 @@ static void *dnodeOpenVnodeFunc(void *param) { pThread->opened++; } - atomic_add_fetch_32(&pMgmt->openVnodes, 1); + atomic_add_fetch_32(&pMgmt->stat.openVnodes, 1); } dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, @@ -422,7 +422,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { return -1; } - pMgmt->totalVnodes = numOfVnodes; + pMgmt->stat.totalVnodes = numOfVnodes; int32_t threadNum = tsNumOfCores; #if 1 @@ -470,11 +470,11 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { free(threads); free(pCfgs); - if (pMgmt->openVnodes != pMgmt->totalVnodes) { - dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes); + if (pMgmt->stat.openVnodes != pMgmt->stat.totalVnodes) { + dError("there are total vnodes:%d, opened:%d", pMgmt->stat.totalVnodes, pMgmt->stat.openVnodes); return -1; } else { - dInfo("total vnodes:%d open successfully", pMgmt->totalVnodes); + dInfo("total vnodes:%d open successfully", pMgmt->stat.totalVnodes); return 0; } } @@ -980,13 +980,18 @@ void dndCleanupVnodes(SDnode *pDnode) { void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; + SVnodesStat *pStat = &pMgmt->stat; int32_t totalVnodes = 0; int32_t masterNum = 0; + int64_t numOfSelectReqs = 0; + int64_t numOfInsertReqs = 0; + int64_t numOfInsertSuccessReqs = 0; + int64_t numOfBatchInsertReqs = 0; + int64_t numOfBatchInsertSuccessReqs = 0; taosRLockLatch(&pMgmt->latch); - int32_t v = 0; - void *pIter = taosHashIterate(pMgmt->hash, NULL); + void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL || *ppVnode == NULL) continue; @@ -996,12 +1001,24 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { vnodeGetLoad(pVnode->pImpl, &vload); taosArrayPush(pLoads, &vload); + numOfSelectReqs += vload.numOfSelectReqs; + numOfInsertReqs += vload.numOfInsertReqs; + numOfInsertSuccessReqs += vload.numOfInsertSuccessReqs; + numOfBatchInsertReqs += vload.numOfBatchInsertReqs; + numOfBatchInsertSuccessReqs += vload.numOfBatchInsertSuccessReqs; totalVnodes++; if (vload.role == TAOS_SYNC_STATE_LEADER) masterNum++; + pIter = taosHashIterate(pMgmt->hash, pIter); } taosRUnLockLatch(&pMgmt->latch); - pMgmt->totalVnodes = totalVnodes; - pMgmt->masterNum = masterNum; + + pStat->totalVnodes = totalVnodes; + pStat->masterNum = masterNum; + pStat->numOfSelectReqs = numOfSelectReqs; + pStat->numOfInsertReqs = numOfInsertReqs; + pStat->numOfInsertSuccessReqs = numOfInsertSuccessReqs; + pStat->numOfBatchInsertReqs = numOfBatchInsertReqs; + pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; } diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 3c0d45df63..f026f8331b 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -54,5 +54,5 @@ elseif(${META_DB_IMPL} STREQUAL "TDB") endif() if(${BUILD_TEST}) - # add_subdirectory(test) + add_subdirectory(test) endif(${BUILD_TEST}) diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index fd079b8f32..e5ad43a4ee 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -38,8 +38,10 @@ typedef struct SMetaCfg { typedef struct SMTbCursor SMTbCursor; typedef struct SMCtbCursor SMCtbCursor; +typedef struct SMSmaCursor SMSmaCursor; typedef SVCreateTbReq STbCfg; +typedef STSma SSmaCfg; // SMeta operations SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); @@ -50,19 +52,24 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid); int metaCommit(SMeta *pMeta); // For Query -STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); -STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); +STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); +STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); -STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); +STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); +SSmaCfg * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName); SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); -char *metaTbCursorNext(SMTbCursor *pTbCur); +char * metaTbCursorNext(SMTbCursor *pTbCur); SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur); +SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid); +void metaCloseSmaCurosr(SMSmaCursor *pSmaCur); +const char * metaSmaCursorNext(SMSmaCursor *pSmaCur); + // Options void metaOptionsInit(SMetaCfg *pMetaCfg); void metaOptionsClear(SMetaCfg *pMetaCfg); diff --git a/source/dnode/vnode/src/inc/metaDef.h b/source/dnode/vnode/src/inc/metaDef.h index 71bfd91356..6b4c036b39 100644 --- a/source/dnode/vnode/src/inc/metaDef.h +++ b/source/dnode/vnode/src/inc/metaDef.h @@ -33,6 +33,8 @@ int metaOpenDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta); int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg); int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); +int metaSaveSmaToDB(SMeta* pMeta, SSmaCfg* pTbCfg); +int metaRemoveSmaFromDb(SMeta* pMeta, const char* indexName); // SMetaCache int metaOpenCache(SMeta* pMeta); diff --git a/source/dnode/vnode/src/inc/tsdbCommit.h b/source/dnode/vnode/src/inc/tsdbCommit.h index 9c35d06880..699aeaa133 100644 --- a/source/dnode/vnode/src/inc/tsdbCommit.h +++ b/source/dnode/vnode/src/inc/tsdbCommit.h @@ -16,6 +16,10 @@ #ifndef _TD_TSDB_COMMIT_H_ #define _TD_TSDB_COMMIT_H_ +#ifdef __cplusplus +extern "C" { +#endif + typedef struct { int minFid; int midFid; @@ -66,4 +70,8 @@ int tsdbApplyRtn(STsdbRepo *pRepo); #endif +#ifdef __cplusplus +} +#endif + #endif /* _TD_TSDB_COMMIT_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/tsdbFS.h b/source/dnode/vnode/src/inc/tsdbFS.h index dab697ce8b..71f35a9eca 100644 --- a/source/dnode/vnode/src/inc/tsdbFS.h +++ b/source/dnode/vnode/src/inc/tsdbFS.h @@ -18,6 +18,10 @@ #include "tsdbFile.h" +#ifdef __cplusplus +extern "C" { +#endif + // ================== TSDB global config extern bool tsdbForceKeepFile; @@ -111,4 +115,8 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { return 0; } +#ifdef __cplusplus +} +#endif + #endif /* _TD_TSDB_FS_H_ */ diff --git a/source/dnode/vnode/src/inc/tsdbFile.h b/source/dnode/vnode/src/inc/tsdbFile.h index bbeb6c6a0f..1034ae015a 100644 --- a/source/dnode/vnode/src/inc/tsdbFile.h +++ b/source/dnode/vnode/src/inc/tsdbFile.h @@ -19,6 +19,10 @@ #include "tchecksum.h" #include "tfs.h" +#ifdef __cplusplus +extern "C" { +#endif + #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF @@ -410,4 +414,8 @@ static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) { return true; } +#ifdef __cplusplus +} +#endif + #endif /* _TS_TSDB_FILE_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/tsdbLog.h b/source/dnode/vnode/src/inc/tsdbLog.h index 6ab17ec587..56dc8ab2a0 100644 --- a/source/dnode/vnode/src/inc/tsdbLog.h +++ b/source/dnode/vnode/src/inc/tsdbLog.h @@ -18,6 +18,10 @@ #include "tlog.h" +#ifdef __cplusplus +extern "C" { +#endif + extern int32_t tsdbDebugFlag; #define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) @@ -27,4 +31,8 @@ extern int32_t tsdbDebugFlag; #define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#ifdef __cplusplus +} +#endif + #endif /* _TD_TSDB_LOG_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/tsdbMemory.h b/source/dnode/vnode/src/inc/tsdbMemory.h index 1fc4cd9e52..df4df0053f 100644 --- a/source/dnode/vnode/src/inc/tsdbMemory.h +++ b/source/dnode/vnode/src/inc/tsdbMemory.h @@ -16,6 +16,10 @@ #ifndef _TD_TSDB_MEMORY_H_ #define _TD_TSDB_MEMORY_H_ +#ifdef __cplusplus +extern "C" { +#endif + static void * taosTMalloc(size_t size); static void * taosTCalloc(size_t nmemb, size_t size); static void * taosTRealloc(void *ptr, size_t size); @@ -70,5 +74,8 @@ static FORCE_INLINE void* taosTZfree(void* ptr) { return NULL; } +#ifdef __cplusplus +} +#endif #endif /* _TD_TSDB_MEMORY_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/tsdbReadImpl.h b/source/dnode/vnode/src/inc/tsdbReadImpl.h index 16eb55967c..cd24358b27 100644 --- a/source/dnode/vnode/src/inc/tsdbReadImpl.h +++ b/source/dnode/vnode/src/inc/tsdbReadImpl.h @@ -24,6 +24,10 @@ #include "tsdbMemory.h" #include "tcommon.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct SReadH SReadH; typedef struct { @@ -244,4 +248,8 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { return 0; } +#ifdef __cplusplus +} +#endif + #endif /*_TD_TSDB_READ_IMPL_H_*/ diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index f75d9d3db0..c31f28d983 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -38,11 +38,14 @@ struct SMetaDB { // DB DB *pTbDB; DB *pSchemaDB; + DB *pSmaDB; + // IDX DB *pNameIdx; DB *pStbIdx; DB *pNtbIdx; DB *pCtbIdx; + DB *pSmaIdx; // ENV DB_ENV *pEvn; }; @@ -61,6 +64,7 @@ static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); +static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); static void metaClearTbCfg(STbCfg *pTbCfg); @@ -100,6 +104,11 @@ int metaOpenDB(SMeta *pMeta) { return -1; } + if (metaOpenBDBDb(&(pDB->pSmaDB), pDB->pEvn, "sma.db", false) < 0) { + metaCloseDB(pMeta); + return -1; + } + // Open Indices if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "name.index", pDB->pTbDB, &metaNameIdxCb, false) < 0) { metaCloseDB(pMeta); @@ -121,15 +130,22 @@ int metaOpenDB(SMeta *pMeta) { return -1; } + if (metaOpenBDBIdx(&(pDB->pSmaIdx), pDB->pEvn, "sma.index", pDB->pSmaDB, &metaSmaIdxCb, true) < 0) { + metaCloseDB(pMeta); + return -1; + } + return 0; } void metaCloseDB(SMeta *pMeta) { if (pMeta->pDB) { + metaCloseBDBIdx(pMeta->pDB->pSmaIdx); metaCloseBDBIdx(pMeta->pDB->pCtbIdx); metaCloseBDBIdx(pMeta->pDB->pNtbIdx); metaCloseBDBIdx(pMeta->pDB->pStbIdx); metaCloseBDBIdx(pMeta->pDB->pNameIdx); + metaCloseBDBDb(pMeta->pDB->pSmaDB); metaCloseBDBDb(pMeta->pDB->pSchemaDB); metaCloseBDBDb(pMeta->pDB->pTbDB); metaCloseBDBEnv(pMeta->pDB->pEvn); @@ -210,6 +226,49 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { return 0; } +int metaSaveSmaToDB(SMeta *pMeta, SSmaCfg *pSmaCfg) { + char buf[512] = {0}; // TODO: may overflow + void *pBuf = NULL; + DBT key1 = {0}, value1 = {0}; + + { + // save sma info + pBuf = buf; + + key1.data = pSmaCfg->indexName; + key1.size = strlen(key1.data); + + tEncodeTSma(&pBuf, pSmaCfg); + + value1.data = buf; + value1.size = POINTER_DISTANCE(pBuf, buf); + value1.app_data = pSmaCfg; + } + + metaDBWLock(pMeta->pDB); + pMeta->pDB->pSmaDB->put(pMeta->pDB->pSmaDB, NULL, &key1, &value1, 0); + metaDBULock(pMeta->pDB); + + return 0; +} + +int metaRemoveSmaFromDb(SMeta *pMeta, const char *indexName) { + // TODO +#if 0 + DBT key = {0}; + + key.data = (void *)indexName; + key.size = strlen(indexName); + + metaDBWLock(pMeta->pDB); + // TODO: No guarantee of consistence. + // Use transaction or DB->sync() for some guarantee. + pMeta->pDB->pSmaDB->del(pMeta->pDB->pSmaDB, NULL, &key, 0); + metaDBULock(pMeta->pDB); +#endif + return 0; +} + /* ------------------------ STATIC METHODS ------------------------ */ static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) { int tlen = 0; @@ -425,6 +484,16 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey } } +static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { + SSmaCfg *pSmaCfg = (SSmaCfg *)(pValue->app_data); + + memset(pSKey, 0, sizeof(*pSKey)); + pSKey->data = &(pSmaCfg->tableUid); + pSKey->size = sizeof(pSmaCfg->tableUid); + + return 0; +} + static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { int tsize = 0; @@ -540,6 +609,36 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { return pTbCfg; } +SSmaCfg *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) { + SSmaCfg *pCfg = NULL; + SMetaDB *pDB = pMeta->pDB; + DBT key = {0}; + DBT value = {0}; + int ret; + + // Set key/value + key.data = (void *)indexName; + key.size = strlen(indexName); + + // Query + metaDBRLock(pDB); + ret = pDB->pTbDB->get(pDB->pSmaDB, NULL, &key, &value, 0); + metaDBULock(pDB); + if (ret != 0) { + return NULL; + } + + // Decode + pCfg = (SSmaCfg *)malloc(sizeof(SSmaCfg)); + if (pCfg == NULL) { + return NULL; + } + + tDecodeTSma(value.data, pCfg); + + return pCfg; +} + SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { uint32_t nCols; SSchemaWrapper *pSW = NULL; @@ -718,6 +817,61 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { } } +struct SMSmaCursor { + DBC *pCur; + tb_uid_t uid; +}; + +SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) { + SMSmaCursor *pCur = NULL; + SMetaDB *pDB = pMeta->pDB; + int ret; + + pCur = (SMSmaCursor *)calloc(1, sizeof(*pCur)); + if (pCur == NULL) { + return NULL; + } + + pCur->uid = uid; + ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &(pCur->pCur), 0); + if (ret != 0) { + free(pCur); + return NULL; + } + + return pCur; +} + +void metaCloseSmaCurosr(SMSmaCursor *pCur) { + if (pCur) { + if (pCur->pCur) { + pCur->pCur->close(pCur->pCur); + } + + free(pCur); + } +} + +const char* metaSmaCursorNext(SMSmaCursor *pCur) { + DBT skey = {0}; + DBT pkey = {0}; + DBT pval = {0}; + void *pBuf; + + // Set key + skey.data = &(pCur->uid); + skey.size = sizeof(pCur->uid); + + if (pCur->pCur->pget(pCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) { + const char* indexName = (const char *)pkey.data; + assert(indexName != NULL); + return indexName; + } else { + return 0; + } +} + + static void metaDBWLock(SMetaDB *pDB) { #if IMPL_WITH_LOCK pthread_rwlock_wrlock(&(pDB->rwlock)); diff --git a/source/dnode/vnode/src/meta/metaIdx.c b/source/dnode/vnode/src/meta/metaIdx.c index d9abb4bb7b..2ca02a2b80 100644 --- a/source/dnode/vnode/src/meta/metaIdx.c +++ b/source/dnode/vnode/src/meta/metaIdx.c @@ -106,3 +106,20 @@ int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) { // TODO return 0; } + +int metaCreateSma(SMeta *pMeta, SSmaCfg *pSmaCfg) { + // Validate the tbOptions + // if (metaValidateTbCfg(pMeta, pTbCfg) < 0) { + // // TODO: handle error + // return -1; + // } + + // TODO: add atomicity + + if (metaSaveSmaToDB(pMeta, pSmaCfg) < 0) { + // TODO: handle error + return -1; + } + + return 0; +} diff --git a/source/dnode/vnode/src/vnd/vnodeInt.c b/source/dnode/vnode/src/vnd/vnodeInt.c index 6d3fa5f7f3..7d0b594e95 100644 --- a/source/dnode/vnode/src/vnd/vnodeInt.c +++ b/source/dnode/vnode/src/vnd/vnodeInt.c @@ -32,6 +32,11 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->totalStorage = 300; pLoad->compStorage = 200; pLoad->pointsWritten = 100; + pLoad->numOfSelectReqs = 1; + pLoad->numOfInsertReqs = 3; + pLoad->numOfInsertSuccessReqs = 2; + pLoad->numOfBatchInsertReqs = 5; + pLoad->numOfBatchInsertSuccessReqs = 4; return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 81eb09f48f..a2616307ff 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -41,10 +41,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { return 0; } -int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - SVCreateTbReq vCreateTbReq; - SVCreateTbBatchReq vCreateTbBatchReq; - void *ptr = NULL; +int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + void *ptr = NULL; if (pVnode->config.streamMode == 0) { ptr = vnodeMalloc(pVnode, pMsg->contLen); @@ -64,7 +62,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } switch (pMsg->msgType) { - case TDMT_VND_CREATE_STB: + case TDMT_VND_CREATE_STB: { + SVCreateTbReq vCreateTbReq = {0}; tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { // TODO: handle error @@ -75,7 +74,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { free(vCreateTbReq.stbCfg.pTagSchema); free(vCreateTbReq.name); break; - case TDMT_VND_CREATE_TABLE: + } + case TDMT_VND_CREATE_TABLE: { + SVCreateTbBatchReq vCreateTbBatchReq = {0}; tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq); for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) { SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); @@ -97,14 +98,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); taosArrayDestroy(vCreateTbBatchReq.pArray); break; - - case TDMT_VND_ALTER_STB: + } + case TDMT_VND_ALTER_STB: { + SVCreateTbReq vAlterTbReq = {0}; vTrace("vgId:%d, process alter stb req", pVnode->vgId); - tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); - free(vCreateTbReq.stbCfg.pSchema); - free(vCreateTbReq.stbCfg.pTagSchema); - free(vCreateTbReq.name); + tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq); + free(vAlterTbReq.stbCfg.pSchema); + free(vAlterTbReq.stbCfg.pTagSchema); + free(vAlterTbReq.name); break; + } case TDMT_VND_DROP_STB: vTrace("vgId:%d, process drop stb req", pVnode->vgId); break; diff --git a/source/dnode/vnode/test/CMakeLists.txt b/source/dnode/vnode/test/CMakeLists.txt index 6b468497d5..af123a3133 100644 --- a/source/dnode/vnode/test/CMakeLists.txt +++ b/source/dnode/vnode/test/CMakeLists.txt @@ -1,20 +1,39 @@ -add_executable(tqTest "") -target_sources(tqTest - PRIVATE - "tqMetaTest.cpp" -) -target_include_directories(tqTest - PUBLIC - "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +MESSAGE(STATUS "vnode unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +# add_executable(tqTest "") +# target_sources(tqTest +# PRIVATE +# "tqMetaTest.cpp" +# ) +# target_include_directories(tqTest +# PUBLIC +# "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" +# "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +# ) + +# target_link_libraries(tqTest +# tq +# gtest_main +# ) +# enable_testing() +# add_test( +# NAME tq_test +# COMMAND tqTest +# ) + +ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp) +TARGET_LINK_LIBRARIES( + tsdbSmaTest + PUBLIC os util common vnode gtest_main ) -target_link_libraries(tqTest - tq - gtest_main -) -enable_testing() -add_test( - NAME tq_test - COMMAND tqTest -) +TARGET_INCLUDE_DIRECTORIES( + tsdbSmaTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/common" + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../src/inc" + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) \ No newline at end of file diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp new file mode 100644 index 0000000000..986986aa70 --- /dev/null +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include + +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +TEST(testCase, tSmaEncodeDecodeTest) { + // encode + STSma tSma = {0}; + tSma.version = 0; + tSma.intervalUnit = TD_TIME_UNIT_DAY; + tSma.interval = 1; + tSma.slidingUnit = TD_TIME_UNIT_HOUR; + tSma.sliding = 0; + tstrncpy(tSma.indexName, "sma_index_test", TSDB_INDEX_NAME_LEN); + tSma.tableUid = 1234567890; + tSma.numOfColIds = 2; + tSma.numOfFuncIds = 5; // sum/min/max/avg/last + tSma.colIds = (col_id_t *)calloc(tSma.numOfColIds, sizeof(col_id_t)); + tSma.funcIds = (uint16_t *)calloc(tSma.numOfFuncIds, sizeof(uint16_t)); + + for (int32_t i = 0; i < tSma.numOfColIds; ++i) { + *(tSma.colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID); + } + for (int32_t i = 0; i < tSma.numOfFuncIds; ++i) { + *(tSma.funcIds + i) = (i + 2); + } + + STSmaWrapper tSmaWrapper = {.number = 1, .tSma = &tSma}; + uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper); + + void *buf = calloc(bufLen, 1); + assert(buf != NULL); + + STSmaWrapper *pSW = (STSmaWrapper *)buf; + uint32_t len = tEncodeTSmaWrapper(&buf, &tSmaWrapper); + + EXPECT_EQ(len, bufLen); + + // decode + STSmaWrapper dstTSmaWrapper = {0}; + void * result = tDecodeTSmaWrapper(pSW, &dstTSmaWrapper); + assert(result != NULL); + + EXPECT_EQ(tSmaWrapper.number, dstTSmaWrapper.number); + + for (int i = 0; i < tSmaWrapper.number; ++i) { + STSma *pSma = tSmaWrapper.tSma + i; + STSma *qSma = dstTSmaWrapper.tSma + i; + + EXPECT_EQ(pSma->version, qSma->version); + EXPECT_EQ(pSma->intervalUnit, qSma->intervalUnit); + EXPECT_EQ(pSma->slidingUnit, qSma->slidingUnit); + EXPECT_STRCASEEQ(pSma->indexName, qSma->indexName); + EXPECT_EQ(pSma->numOfColIds, qSma->numOfColIds); + EXPECT_EQ(pSma->numOfFuncIds, qSma->numOfFuncIds); + EXPECT_EQ(pSma->tableUid, qSma->tableUid); + EXPECT_EQ(pSma->interval, qSma->interval); + EXPECT_EQ(pSma->sliding, qSma->sliding); + for (uint32_t j = 0; j < pSma->numOfColIds; ++j) { + EXPECT_EQ(*(col_id_t *)(pSma->colIds + j), *(col_id_t *)(qSma->colIds + j)); + } + for (uint32_t j = 0; j < pSma->numOfFuncIds; ++j) { + EXPECT_EQ(*(uint16_t *)(pSma->funcIds + j), *(uint16_t *)(qSma->funcIds + j)); + } + } + + // resource release + tdDestroyTSma(&tSma, false); + tdDestroyTSmaWrapper(&dstTSmaWrapper); +} + +TEST(testCase, tSma_DB_Put_Get_Del_Test) { + const char *smaIndexName1 = "sma_index_test_1"; + const char *smaIndexName2 = "sma_index_test_2"; + const char *smaTestDir = "./smaTest"; + const uint64_t tbUid = 1234567890; + // encode + STSma tSma = {0}; + tSma.version = 0; + tSma.intervalUnit = TD_TIME_UNIT_DAY; + tSma.interval = 1; + tSma.slidingUnit = TD_TIME_UNIT_HOUR; + tSma.sliding = 0; + tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN); + tSma.tableUid = tbUid; + tSma.numOfColIds = 2; + tSma.numOfFuncIds = 5; // sum/min/max/avg/last + tSma.colIds = (col_id_t *)calloc(tSma.numOfColIds, sizeof(col_id_t)); + tSma.funcIds = (uint16_t *)calloc(tSma.numOfFuncIds, sizeof(uint16_t)); + + for (int32_t i = 0; i < tSma.numOfColIds; ++i) { + *(tSma.colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID); + } + for (int32_t i = 0; i < tSma.numOfFuncIds; ++i) { + *(tSma.funcIds + i) = (i + 2); + } + + SMeta * pMeta = NULL; + SSmaCfg * pSmaCfg = &tSma; + const SMetaCfg *pMetaCfg = &defaultMetaOptions; + + taosRemoveDir(smaTestDir); + + pMeta = metaOpen(smaTestDir, pMetaCfg, NULL); + assert(pMeta != NULL); + // save index 1 + metaSaveSmaToDB(pMeta, pSmaCfg); + + tstrncpy(pSmaCfg->indexName, smaIndexName2, TSDB_INDEX_NAME_LEN); + pSmaCfg->version = 1; + pSmaCfg->intervalUnit = TD_TIME_UNIT_HOUR; + pSmaCfg->interval = 1; + pSmaCfg->slidingUnit = TD_TIME_UNIT_MINUTE; + pSmaCfg->sliding = 5; + + // save index 2 + metaSaveSmaToDB(pMeta, pSmaCfg); + + // get value by indexName + SSmaCfg *qSmaCfg = NULL; + qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName1); + assert(qSmaCfg != NULL); + printf("name1 = %s\n", qSmaCfg->indexName); + EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1); + EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid); + tdDestroyTSma(qSmaCfg, true); + + qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName2); + assert(qSmaCfg != NULL); + printf("name2 = %s\n", qSmaCfg->indexName); + EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2); + EXPECT_EQ(qSmaCfg->interval, tSma.interval); + tdDestroyTSma(qSmaCfg, true); + + // get value by table uid + SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid); + assert(pSmaCur != NULL); + uint32_t indexCnt = 0; + while (1) { + const char* indexName = metaSmaCursorNext(pSmaCur); + if (indexName == NULL) { + break; + } + printf("indexName = %s\n", indexName); + ++indexCnt; + } + EXPECT_EQ(indexCnt, 2); + metaCloseSmaCurosr(pSmaCur); + + // resource release + metaRemoveSmaFromDb(pMeta, smaIndexName1); + metaRemoveSmaFromDb(pMeta, smaIndexName2); + + tdDestroyTSma(&tSma, false); + metaClose(pMeta); +} + +#if 0 +TEST(testCase, tSmaInsertTest) { + STSma tSma = {0}; + STSmaData* pSmaData = NULL; + STsdb tsdb = {0}; + + // init + tSma.intervalUnit = TD_TIME_UNIT_DAY; + tSma.interval = 1; + tSma.numOfFuncIds = 5; // sum/min/max/avg/last + + int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t); + int32_t numOfColIds = 3; + int32_t numOfSmaBlocks = 10; + + int32_t dataLen = numOfColIds * numOfSmaBlocks * blockSize; + + pSmaData = (STSmaData*)malloc(sizeof(STSmaData) + dataLen); + ASSERT_EQ(pSmaData != NULL, true); + pSmaData->tableUid = 3232329230; + pSmaData->numOfColIds = numOfColIds; + pSmaData->numOfSmaBlocks = numOfSmaBlocks; + pSmaData->dataLen = dataLen; + pSmaData->tsWindow.skey = 1640000000; + pSmaData->tsWindow.ekey = 1645788649; + pSmaData->colIds = (col_id_t*)malloc(sizeof(col_id_t) * numOfColIds); + ASSERT_EQ(pSmaData->colIds != NULL, true); + + for (int32_t i = 0; i < numOfColIds; ++i) { + *(pSmaData->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID); + } + + // execute + EXPECT_EQ(tsdbInsertTSmaData(&tsdb, &tSma, pSmaData), TSDB_CODE_SUCCESS); + + // release + tdDestroySmaData(pSmaData); +} +#endif + +#pragma GCC diagnostic pop \ No newline at end of file diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index d47954ac1f..3d1d5356c2 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -96,8 +96,10 @@ typedef struct SIndexTermQuery { typedef struct Iterate Iterate; typedef struct IterateValue { - int8_t type; // opera type, ADD_VALUE/DELETE_VALUE - char* colVal; + int8_t type; // opera type, ADD_VALUE/DELETE_VALUE + uint64_t ver; // data ver, tfile data version is 0 + char* colVal; + SArray* val; } IterateValue; diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 1445a1bc56..a6ebcd6d6f 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -16,6 +16,7 @@ #define __INDEX_CACHE_H__ #include "indexInt.h" +#include "index_util.h" #include "tskiplist.h" // ----------------- key structure in skiplist --------------------- @@ -52,8 +53,9 @@ typedef struct CacheTerm { char* colVal; int32_t version; // value - uint64_t uid; - int8_t colType; + uint64_t uid; + int8_t colType; + SIndexOperOnColumn operaType; } CacheTerm; // @@ -68,7 +70,7 @@ void indexCacheIteratorDestroy(Iterate* iiter); int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid); // int indexCacheGet(void *cache, uint64_t *rst); -int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s); +int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s); void indexCacheRef(IndexCache* cache); void indexCacheUnRef(IndexCache* cache); diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index f676651e52..3794898d3a 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -19,6 +19,7 @@ #include "index_fst.h" #include "index_fst_counting_writer.h" #include "index_tfile.h" +#include "index_util.h" #include "tlockfree.h" #ifdef __cplusplus @@ -103,7 +104,7 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName); TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName); TFileReader* tfileReaderCreate(WriterCtx* ctx); void tfileReaderDestroy(TFileReader* reader); -int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result); +int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr); void tfileReaderRef(TFileReader* reader); void tfileReaderUnRef(TFileReader* reader); @@ -118,7 +119,7 @@ int tfileWriterFinish(TFileWriter* tw); IndexTFile* indexTFileCreate(const char* path); void indexTFileDestroy(IndexTFile* tfile); int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid); -int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result); +int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* tr); Iterate* tfileIteratorCreate(TFileReader* reader); void tfileIteratorDestroy(Iterate* iterator); diff --git a/source/libs/index/inc/index_util.h b/source/libs/index/inc/index_util.h index 313839bf1d..814d61afd7 100644 --- a/source/libs/index/inc/index_util.h +++ b/source/libs/index/inc/index_util.h @@ -47,6 +47,19 @@ extern "C" { buf += len; \ } while (0) +#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ + { \ + bool f = false; \ + for (int i = 0; i < taosArrayGetSize(src); i++) { \ + if (*(uint64_t *)taosArrayGet(src, i) == tgt) { \ + f = true; \ + } \ + } \ + if (f == false) { \ + taosArrayPush(dst, &tgt); \ + } \ + } + /* multi sorted result intersection * input: [1, 2, 4, 5] * [2, 3, 4, 5] @@ -66,10 +79,32 @@ void iUnion(SArray *interResults, SArray *finalResult); /* sorted array * total: [1, 2, 4, 5, 7, 8] * except: [4, 5] - * return: [1, 2, 7, 8] + * return: [1, 2, 7, 8] saved in total */ void iExcept(SArray *total, SArray *except); + +int uidCompare(const void *a, const void *b); + +// data with ver +typedef struct { + uint32_t ver; + uint64_t data; +} SIdxVerdata; + +typedef struct { + SArray *total; + SArray *added; + SArray *deled; +} SIdxTempResult; + +SIdxTempResult *sIdxTempResultCreate(); + +void sIdxTempResultClear(SIdxTempResult *tr); + +void sIdxTempResultDestroy(SIdxTempResult *tr); + +void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr); #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 75dc05fc5b..ae0a6c775e 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -31,18 +31,6 @@ void* indexQhandle = NULL; -#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ - { \ - bool f = false; \ - for (int i = 0; i < taosArrayGetSize(src); i++) { \ - if (*(uint64_t*)taosArrayGet(src, i) == tgt) { \ - f = true; \ - } \ - } \ - if (f == false) { \ - taosArrayPush(dst, &tgt); \ - } \ - } void indexInit() { // refactor later indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); @@ -52,23 +40,11 @@ void indexCleanUp() { taosCleanUpScheduler(indexQhandle); } -static int uidCompare(const void* a, const void* b) { - // add more version compare - uint64_t u1 = *(uint64_t*)a; - uint64_t u2 = *(uint64_t*)b; - return u1 - u2; -} typedef struct SIdxColInfo { int colId; // generated by index internal int cVersion; } SIdxColInfo; -typedef struct SIdxTempResult { - SArray* total; - SArray* added; - SArray* deled; -} SIdxTempResult; - static pthread_once_t isInit = PTHREAD_ONCE_INIT; // static void indexInit(); static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result); @@ -255,6 +231,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { #ifdef USE_INVERTED_INDEX + #endif return 1; @@ -363,22 +340,30 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result *result = taosArrayInit(4, sizeof(uint64_t)); // TODO: iterator mem and tidex STermValueType s = kTypeValue; - if (0 == indexCacheSearch(cache, query, *result, &s)) { + + SIdxTempResult* tr = sIdxTempResultCreate(); + if (0 == indexCacheSearch(cache, query, tr, &s)) { if (s == kTypeDeletion) { indexInfo("col: %s already drop by", term->colName); // coloum already drop by other oper, no need to query tindex return 0; } else { - if (0 != indexTFileSearch(sIdx->tindex, query, *result)) { + if (0 != indexTFileSearch(sIdx->tindex, query, tr)) { indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal); - return -1; + goto END; } } } else { indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal); - return -1; + goto END; } + + sIdxTempResultMergeTo(*result, tr); + sIdxTempResultDestroy(tr); return 0; +END: + sIdxTempResultDestroy(tr); + return -1; } static void indexInterResultsDestroy(SArray* results) { if (results == NULL) { @@ -413,43 +398,6 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType return 0; } -SIdxTempResult* sIdxTempResultCreate() { - SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult)); - tr->total = taosArrayInit(4, sizeof(uint64_t)); - tr->added = taosArrayInit(4, sizeof(uint64_t)); - tr->deled = taosArrayInit(4, sizeof(uint64_t)); - return tr; -} -void sIdxTempResultClear(SIdxTempResult* tr) { - if (tr == NULL) { - return; - } - taosArrayClear(tr->total); - taosArrayClear(tr->added); - taosArrayClear(tr->deled); -} -void sIdxTempResultDestroy(SIdxTempResult* tr) { - if (tr == NULL) { - return; - } - taosArrayDestroy(tr->total); - taosArrayDestroy(tr->added); - taosArrayDestroy(tr->deled); -} -static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) { - taosArraySort(tr->total, uidCompare); - taosArraySort(tr->added, uidCompare); - taosArraySort(tr->deled, uidCompare); - - SArray* arrs = taosArrayInit(2, sizeof(void*)); - taosArrayPush(arrs, &tr->total); - taosArrayPush(arrs, &tr->added); - - iUnion(arrs, result); - taosArrayDestroy(arrs); - - iExcept(result, tr->deled); -} static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) { int32_t sz = taosArrayGetSize(result); if (sz > 0) { @@ -478,6 +426,7 @@ static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateVal if (cv != NULL) { uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0); + uint32_t ver = cv->ver; if (cv->type == ADD_VALUE) { INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id) } else if (cv->type == DEL_VALUE) { diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 1ac72e10a9..d3b25afdbc 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -256,7 +256,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u return 0; } -static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) { +static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SIdxTempResult* tr, STermValueType* s) { if (mem == NULL) { return 0; } @@ -267,28 +267,23 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA SSkipListNode* node = tSkipListIterGet(iter); if (node != NULL) { CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); - // if (c->operaType == ADD_VALUE) { - //} else if (c->operaType == DEL_VALUE) { - //} - - if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) { - if (strcmp(c->colVal, ct->colVal) == 0) { - taosArrayPush(result, &c->uid); - *s = kTypeValue; - } else { - break; + if (qtype == QUERY_TERM) { + if (0 == strcmp(c->colVal, ct->colVal)) { + if (c->operaType == ADD_VALUE) { + INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) + // taosArrayPush(result, &c->uid); + *s = kTypeValue; + } else if (c->operaType == DEL_VALUE) { + INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) + } } - } else if (c->operaType == DEL_VALUE) { - // table is del, not need - *s = kTypeDeletion; - break; } } } tSkipListDestroyIter(iter); return 0; } -int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) { +int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) { if (cache == NULL) { return 0; } @@ -416,6 +411,7 @@ static bool indexCacheIteratorNext(Iterate* itera) { CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); iv->type = ct->operaType; + iv->ver = ct->version; iv->colVal = tstrdup(ct->colVal); taosArrayPush(iv->val, &ct->uid); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index f5f46b0617..fd267fbf03 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -184,12 +184,13 @@ void tfileReaderDestroy(TFileReader* reader) { free(reader); } -int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) { +int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) { SIndexTerm* term = query->term; bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); EIndexQueryType qtype = query->qType; - int ret = -1; + SArray* result = taosArrayInit(16, sizeof(uint64_t)); + int ret = -1; // refactor to callback later if (qtype == QUERY_TERM) { uint64_t offset; @@ -223,6 +224,10 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul // handle later } tfileReaderUnRef(reader); + + taosArrayAddAll(tr->total, result); + taosArrayDestroy(result); + return ret; } @@ -248,7 +253,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c tfileGenFileFullName(fullname, path, suid, colName, version); WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); - indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size); + indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size); if (wc == NULL) { return NULL; } @@ -380,7 +385,7 @@ void indexTFileDestroy(IndexTFile* tfile) { free(tfile); } -int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { +int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result) { int ret = -1; if (tfile == NULL) { return ret; @@ -428,6 +433,7 @@ static bool tfileIteratorNext(Iterate* iiter) { return false; } + iv->ver = 0; iv->type = ADD_VALUE; // value in tfile always ADD_VALUE iv->colVal = colVal; return true; @@ -628,7 +634,7 @@ static int tfileReaderLoadFst(TFileReader* reader) { int64_t ts = taosGetTimestampUs(); int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset); int64_t cost = taosGetTimestampUs() - ts; - indexInfo("nread = %d, and fst offset=%d, size: %d, filename: %s, size: %d, time cost: %" PRId64 "us", nread, + indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread, reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost); // we assuse fst size less than FST_MAX_SIZE assert(nread > 0 && nread <= fstSize); diff --git a/source/libs/index/src/index_util.c b/source/libs/index/src/index_util.c index fc28484de9..dfe4e273a9 100644 --- a/source/libs/index/src/index_util.c +++ b/source/libs/index/src/index_util.c @@ -14,6 +14,7 @@ */ #include "index_util.h" #include "index.h" +#include "tcompare.h" typedef struct MergeIndex { int idx; @@ -135,3 +136,60 @@ void iExcept(SArray *total, SArray *except) { taosArrayPopTailBatch(total, tsz - vIdx); } + +int uidCompare(const void *a, const void *b) { + // add more version compare + uint64_t u1 = *(uint64_t *)a; + uint64_t u2 = *(uint64_t *)b; + return u1 - u2; +} +int verdataCompare(const void *a, const void *b) { + SIdxVerdata *va = (SIdxVerdata *)a; + SIdxVerdata *vb = (SIdxVerdata *)b; + + int32_t cmp = compareUint64Val(&va->data, &vb->data); + if (cmp == 0) { + cmp = 0 - compareUint32Val(&va->ver, &vb->data); + return cmp; + } + return cmp; +} + +SIdxTempResult *sIdxTempResultCreate() { + SIdxTempResult *tr = calloc(1, sizeof(SIdxTempResult)); + + tr->total = taosArrayInit(4, sizeof(uint64_t)); + tr->added = taosArrayInit(4, sizeof(uint64_t)); + tr->deled = taosArrayInit(4, sizeof(uint64_t)); + return tr; +} +void sIdxTempResultClear(SIdxTempResult *tr) { + if (tr == NULL) { + return; + } + taosArrayClear(tr->total); + taosArrayClear(tr->added); + taosArrayClear(tr->deled); +} +void sIdxTempResultDestroy(SIdxTempResult *tr) { + if (tr == NULL) { + return; + } + taosArrayDestroy(tr->total); + taosArrayDestroy(tr->added); + taosArrayDestroy(tr->deled); +} +void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr) { + taosArraySort(tr->total, uidCompare); + taosArraySort(tr->added, uidCompare); + taosArraySort(tr->deled, uidCompare); + + SArray *arrs = taosArrayInit(2, sizeof(void *)); + taosArrayPush(arrs, &tr->total); + taosArrayPush(arrs, &tr->added); + + iUnion(arrs, result); + taosArrayDestroy(arrs); + + iExcept(result, tr->deled); +} diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index a50e91b094..3f46a042ae 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -24,6 +24,7 @@ #include "index_fst_counting_writer.h" #include "index_fst_util.h" #include "index_tfile.h" +#include "index_util.h" #include "tskiplist.h" #include "tutil.h" using namespace std; @@ -393,7 +394,13 @@ class TFileObj { // // } - return tfileReaderSearch(reader_, query, result); + SIdxTempResult* tr = sIdxTempResultCreate(); + + int ret = tfileReaderSearch(reader_, query, tr); + + sIdxTempResultMergeTo(result, tr); + sIdxTempResultDestroy(tr); + return ret; } ~TFileObj() { if (writer_) { @@ -507,9 +514,13 @@ class CacheObj { indexCacheDebug(cache); } int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { - int ret = indexCacheSearch(cache, query, result, s); + SIdxTempResult* tr = sIdxTempResultCreate(); + + int ret = indexCacheSearch(cache, query, tr, s); + sIdxTempResultMergeTo(result, tr); + sIdxTempResultDestroy(tr); + if (ret != 0) { - // std::cout << "failed to get from cache:" << ret << std::endl; } return ret; @@ -649,7 +660,7 @@ class IndexObj { indexInit(); } int Init(const std::string& dir) { - // taosRemoveDir(dir.c_str()); + taosRemoveDir(dir.c_str()); taosMkDir(dir.c_str()); int ret = indexOpen(&opts, dir.c_str(), &idx); if (ret != 0) { @@ -658,6 +669,14 @@ class IndexObj { } return ret; } + void Del(const std::string& colName, const std::string& colVal, uint64_t uid) { + SIndexTerm* term = indexTermCreate(0, DEL_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + Put(terms, uid); + indexMultiTermDestroy(terms); + } int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world", size_t numOfTable = 100 * 10000) { SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), @@ -730,6 +749,7 @@ class IndexObj { std::cout << "search and time cost:" << e - s << "\tquery col:" << colName << "\t val: " << colVal << "\t size:" << taosArrayGetSize(result) << std::endl; } else { + return -1; } int sz = taosArrayGetSize(result); indexMultiTermQueryDestroy(mq); @@ -797,13 +817,9 @@ class IndexObj { class IndexEnv2 : public ::testing::Test { protected: - virtual void SetUp() { - index = new IndexObj(); - } - virtual void TearDown() { - delete index; - } - IndexObj* index; + virtual void SetUp() { index = new IndexObj(); } + virtual void TearDown() { delete index; } + IndexObj* index; }; TEST_F(IndexEnv2, testIndexOpen) { std::string path = "/tmp/test"; @@ -1042,3 +1058,19 @@ TEST_F(IndexEnv2, testIndex_read_performance4) { std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; assert(3 == index->SearchOne("tag10", "Hello")); } +TEST_F(IndexEnv2, testIndex_del) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + for (int i = 0; i < 100; i++) { + index->PutOneTarge("tag10", "Hello", i); + } + index->Del("tag10", "Hello", 12); + index->Del("tag10", "Hello", 11); + + index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000); + + EXPECT_EQ(98, index->SearchOne("tag10", "Hello")); + // std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; + // assert(3 == index->SearchOne("tag10", "Hello")); +} diff --git a/source/libs/monitor/inc/monInt.h b/source/libs/monitor/inc/monInt.h index 65e47ef90c..bfb73af034 100644 --- a/source/libs/monitor/inc/monInt.h +++ b/source/libs/monitor/inc/monInt.h @@ -28,9 +28,24 @@ typedef struct { char content[MON_LOG_LEN]; } SMonLogItem; +typedef struct { + int64_t time; + int64_t req_select; + int64_t req_insert; + int64_t req_insert_batch; + int64_t net_in; + int64_t net_out; + int64_t io_read; + int64_t io_write; + int64_t io_read_disk; + int64_t io_write_disk; +} SMonState; + typedef struct SMonInfo { - SArray *logs; // array of SMonLogItem - SJson *pJson; + int64_t curTime; + SMonState lastState; + SArray *logs; // array of SMonLogItem + SJson *pJson; } SMonInfo; typedef struct { @@ -39,6 +54,7 @@ typedef struct { int32_t maxLogs; const char *server; uint16_t port; + SMonState state; } SMonitor; #ifdef __cplusplus diff --git a/source/libs/monitor/src/monitor.c b/source/libs/monitor/src/monitor.c index 7057f9bcd2..354989a7a1 100644 --- a/source/libs/monitor/src/monitor.c +++ b/source/libs/monitor/src/monitor.c @@ -46,6 +46,7 @@ int32_t monInit(const SMonCfg *pCfg) { tsMonitor.server = pCfg->server; tsMonitor.port = pCfg->port; tsLogFp = monRecordLog; + tsMonitor.state.time = taosGetTimestampMs(); pthread_mutex_init(&tsMonitor.lock, NULL); return 0; } @@ -76,20 +77,23 @@ SMonInfo *monCreateMonitorInfo() { return NULL; } + pMonitor->curTime = taosGetTimestampMs(); + pMonitor->lastState = tsMonitor.state; return pMonitor; } void monCleanupMonitorInfo(SMonInfo *pMonitor) { + tsMonitor.state = pMonitor->lastState; + tsMonitor.state.time = pMonitor->curTime; taosArrayDestroy(pMonitor->logs); tjsonDelete(pMonitor->pJson); free(pMonitor); } void monSetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo) { - SJson *pJson = pMonitor->pJson; - int64_t ms = taosGetTimestampMs(); - char buf[40] = {0}; - taosFormatUtcTime(buf, sizeof(buf), ms, TSDB_TIME_PRECISION_MILLI); + SJson *pJson = pMonitor->pJson; + char buf[40] = {0}; + taosFormatUtcTime(buf, sizeof(buf), pMonitor->curTime, TSDB_TIME_PRECISION_MILLI); tjsonAddStringToObject(pJson, "ts", buf); tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id); @@ -203,6 +207,27 @@ void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) { return; } + SMonState *pLast = &pMonitor->lastState; + double interval = (pMonitor->curTime - pLast->time) / 1000.0; + double req_select_rate = (pInfo->req_select - pLast->req_select) / interval; + double req_insert_rate = (pInfo->req_insert - pLast->req_insert) / interval; + double req_insert_batch_rate = (pInfo->req_insert_batch - pLast->req_insert_batch) / interval; + double net_in_rate = (pInfo->net_in - pLast->net_in) / interval; + double net_out_rate = (pInfo->net_out - pLast->net_out) / interval; + double io_read_rate = (pInfo->io_read - pLast->io_read) / interval; + double io_write_rate = (pInfo->io_write - pLast->io_write) / interval; + double io_read_disk_rate = (pInfo->io_read_disk - pLast->io_read_disk) / interval; + double io_write_disk_rate = (pInfo->io_write_disk - pLast->io_write_disk) / interval; + pLast->req_select = pInfo->req_select; + pLast->req_insert = pInfo->req_insert; + pLast->req_insert_batch = pInfo->req_insert_batch; + pLast->net_in = pInfo->net_in; + pLast->net_out = pInfo->net_out; + pLast->io_read = pInfo->io_read; + pLast->io_write = pInfo->io_write; + pLast->io_read_disk = pInfo->io_read_disk; + pLast->io_write_disk = pInfo->io_write_disk; + tjsonAddDoubleToObject(pJson, "uptime", pInfo->uptime); tjsonAddDoubleToObject(pJson, "cpu_engine", pInfo->cpu_engine); tjsonAddDoubleToObject(pJson, "cpu_system", pInfo->cpu_system); @@ -213,20 +238,20 @@ void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) { tjsonAddDoubleToObject(pJson, "disk_engine", pInfo->disk_engine); tjsonAddDoubleToObject(pJson, "disk_used", pInfo->disk_used); tjsonAddDoubleToObject(pJson, "disk_total", pInfo->disk_total); - tjsonAddDoubleToObject(pJson, "net_in", pInfo->net_in); - tjsonAddDoubleToObject(pJson, "net_out", pInfo->net_out); - tjsonAddDoubleToObject(pJson, "io_read", pInfo->io_read); - tjsonAddDoubleToObject(pJson, "io_write", pInfo->io_write); - tjsonAddDoubleToObject(pJson, "io_read_disk", pInfo->io_read_disk); - tjsonAddDoubleToObject(pJson, "io_write_disk", pInfo->io_write_disk); + tjsonAddDoubleToObject(pJson, "net_in", net_in_rate); + tjsonAddDoubleToObject(pJson, "net_out", net_out_rate); + tjsonAddDoubleToObject(pJson, "io_read", io_read_rate); + tjsonAddDoubleToObject(pJson, "io_write", io_write_rate); + tjsonAddDoubleToObject(pJson, "io_read_disk", io_read_disk_rate); + tjsonAddDoubleToObject(pJson, "io_write_disk", io_write_disk_rate); tjsonAddDoubleToObject(pJson, "req_select", pInfo->req_select); - tjsonAddDoubleToObject(pJson, "req_select_rate", pInfo->req_select_rate); + tjsonAddDoubleToObject(pJson, "req_select_rate", req_select_rate); tjsonAddDoubleToObject(pJson, "req_insert", pInfo->req_insert); tjsonAddDoubleToObject(pJson, "req_insert_success", pInfo->req_insert_success); - tjsonAddDoubleToObject(pJson, "req_insert_rate", pInfo->req_insert_rate); + tjsonAddDoubleToObject(pJson, "req_insert_rate", req_insert_rate); tjsonAddDoubleToObject(pJson, "req_insert_batch", pInfo->req_insert_batch); tjsonAddDoubleToObject(pJson, "req_insert_batch_success", pInfo->req_insert_batch_success); - tjsonAddDoubleToObject(pJson, "req_insert_batch_rate", pInfo->req_insert_batch_rate); + tjsonAddDoubleToObject(pJson, "req_insert_batch_rate", req_insert_batch_rate); tjsonAddDoubleToObject(pJson, "errors", pInfo->errors); tjsonAddDoubleToObject(pJson, "vnodes_num", pInfo->vnodes_num); tjsonAddDoubleToObject(pJson, "masters", pInfo->masters); diff --git a/source/libs/monitor/test/monTest.cpp b/source/libs/monitor/test/monTest.cpp index 3eaab45e3e..a6486b992e 100644 --- a/source/libs/monitor/test/monTest.cpp +++ b/source/libs/monitor/test/monTest.cpp @@ -142,13 +142,10 @@ void MonitorTest::GetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) { pInfo->io_read_disk = 7.1; pInfo->io_write_disk = 7.2; pInfo->req_select = 8; - pInfo->req_select_rate = 8.1; pInfo->req_insert = 9; pInfo->req_insert_success = 10; - pInfo->req_insert_rate = 10.1; pInfo->req_insert_batch = 11; pInfo->req_insert_batch_success = 12; - pInfo->req_insert_batch_rate = 12.3; pInfo->errors = 4; pInfo->vnodes_num = 5; pInfo->masters = 6; diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 8f2820f67a..ff9d5fb71b 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -118,7 +118,7 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { return 0; } -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { +int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { IO_COUNTERS io_counter; if (GetProcessIoCounters(GetCurrentProcess(), &io_counter)) { if (rchars) *rchars = io_counter.ReadTransferCount; @@ -135,9 +135,7 @@ void taosGetSystemInfo() { taosGetTotalMemory(&tsTotalMemoryKB); double tmp1, tmp2, tmp3, tmp4; - taosGetBandSpeed(&tmp1, &tmp2); taosGetCpuUsage(&tmp1, &tmp2); - taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); } void taosKillSystem() { @@ -227,7 +225,7 @@ void taosGetSystemInfo() { tsNumOfCores = sysconf(_SC_NPROCESSORS_ONLN); } -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { +int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { if (rchars) *rchars = 0; if (wchars) *wchars = 0; if (read_bytes) *read_bytes = 0; @@ -336,7 +334,7 @@ static char tsProcCpuFile[25] = {0}; static char tsProcMemFile[25] = {0}; static char tsProcIOFile[25] = {0}; -static void taosGetProcInfos() { +static void taosGetProcIOnfos() { tsPageSizeKB = sysconf(_SC_PAGESIZE) / 1024; tsOpenMax = sysconf(_SC_OPEN_MAX); tsStreamMax = sysconf(_SC_STREAM_MAX); @@ -544,41 +542,7 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { return 0; } -int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec) { - static int64_t last_receive_bytes = 0; - static int64_t last_transmit_bytes = 0; - static int64_t last_time = 0; - int64_t cur_receive_bytes = 0; - int64_t cur_transmit_bytes = 0; - int64_t cur_time = taosGetTimestampMs(); - - if (taosGetCardInfo(&cur_receive_bytes, &cur_transmit_bytes) != 0) { - return -1; - } - - if (last_time == 0 || last_time >= cur_time) { - last_time = cur_time; - last_receive_bytes = cur_receive_bytes; - last_transmit_bytes = cur_transmit_bytes; - *receive_bytes_per_sec = 0; - *transmit_bytes_per_sec = 0; - return 0; - } - - *receive_bytes_per_sec = (cur_receive_bytes - last_receive_bytes) / (double)(cur_time - last_time) * 1000; - *transmit_bytes_per_sec = (cur_transmit_bytes - last_transmit_bytes) / (double)(cur_time - last_time) * 1000; - - last_time = cur_time; - last_transmit_bytes = cur_transmit_bytes; - last_receive_bytes = cur_receive_bytes; - - if (*receive_bytes_per_sec < 0) *receive_bytes_per_sec = 0; - if (*transmit_bytes_per_sec < 0) *transmit_bytes_per_sec = 0; - - return 0; -} - -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { +int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { TdFilePtr pFile = taosOpenFile(tsProcIOFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) return -1; @@ -620,61 +584,13 @@ int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, in return 0; } -int32_t taosGetIOSpeed(double *rchar_per_sec, double *wchar_per_sec, double *read_bytes_per_sec, - double *write_bytes_per_sec) { - static int64_t last_rchar = -1; - static int64_t last_wchar = -1; - static int64_t last_read_bytes = -1; - static int64_t last_write_bytes = -1; - static int64_t last_time = 0; - - int64_t cur_rchar = 0; - int64_t cur_wchar = 0; - int64_t cur_read_bytes = 0; - int64_t cur_write_bytes = 0; - int64_t cur_time = taosGetTimestampMs(); - - if (taosReadProcIO(&cur_rchar, &cur_wchar, &cur_read_bytes, &cur_write_bytes) != 0) { - return -1; - } - - if (last_time == 0 || last_time >= cur_time) { - last_time = cur_time; - last_rchar = cur_rchar; - last_wchar = cur_wchar; - last_read_bytes = cur_read_bytes; - last_write_bytes = cur_write_bytes; - return -1; - } - - *rchar_per_sec = (cur_rchar - last_rchar) / (double)(cur_time - last_time) * 1000; - *wchar_per_sec = (cur_wchar - last_wchar) / (double)(cur_time - last_time) * 1000; - *read_bytes_per_sec = (cur_read_bytes - last_read_bytes) / (double)(cur_time - last_time) * 1000; - *write_bytes_per_sec = (cur_write_bytes - last_write_bytes) / (double)(cur_time - last_time) * 1000; - - last_time = cur_time; - last_rchar = cur_rchar; - last_wchar = cur_wchar; - last_read_bytes = cur_read_bytes; - last_write_bytes = cur_write_bytes; - - if (*rchar_per_sec < 0) *rchar_per_sec = 0; - if (*wchar_per_sec < 0) *wchar_per_sec = 0; - if (*read_bytes_per_sec < 0) *read_bytes_per_sec = 0; - if (*write_bytes_per_sec < 0) *write_bytes_per_sec = 0; - - return 0; -} - void taosGetSystemInfo() { - taosGetProcInfos(); + taosGetProcIOnfos(); taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); double tmp1, tmp2, tmp3, tmp4; - taosGetBandSpeed(&tmp1, &tmp2); taosGetCpuUsage(&tmp1, &tmp2); - taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); } void taosKillSystem() {