From 552c3a1ae6e23482878e9240f3395a55993f8208 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 5 Mar 2022 15:05:13 +0800 Subject: [PATCH 1/3] monitor for vnodes --- include/common/tmsg.h | 5 +++ source/dnode/mgmt/impl/inc/dndEnv.h | 19 ++++++++-- source/dnode/mgmt/impl/inc/dndMnode.h | 1 - source/dnode/mgmt/impl/src/dndMgmt.c | 24 ++++++------ source/dnode/mgmt/impl/src/dndMnode.c | 7 ---- source/dnode/mgmt/impl/src/dndVnodes.c | 51 +++++++++++++++++++++----- source/dnode/vnode/src/vnd/vnodeInt.c | 5 +++ 7 files changed, 81 insertions(+), 31 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1b08d6f241..55279dbe55 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -671,6 +671,11 @@ typedef struct { int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; + int64_t numOfSelectReqs; + int64_t numOfInsertReqs; + int64_t numOfInsertSuccessReqs; + int64_t numOfBatchInsertReqs; + int64_t numOfBatchInsertSuccessReqs; } SVnodeLoad; typedef struct { diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index b9acbea02f..ac72f8772b 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -105,10 +105,23 @@ typedef struct { } SBnodeMgmt; typedef struct { + int32_t openVnodes; + int32_t totalVnodes; + int32_t masterNum; + int64_t numOfSelectReqs; + double speedOfSelectReqs; + int64_t numOfInsertReqs; + int64_t numOfInsertSuccessReqs; + double speedOfInsertReqs; + int64_t numOfBatchInsertReqs; + int64_t numOfBatchInsertSuccessReqs; + double speedOfBatchInsertReqs; + int64_t lastTime; +} SVnodesStat; + +typedef struct { + SVnodesStat stat; SHashObj *hash; - int32_t openVnodes; - int32_t totalVnodes; - int32_t masterNum; SRWLatch latch; SQWorkerPool queryPool; SFWorkerPool fetchPool; diff --git a/source/dnode/mgmt/impl/inc/dndMnode.h b/source/dnode/mgmt/impl/inc/dndMnode.h index 0aee3b4b43..0f03bb3832 100644 --- a/source/dnode/mgmt/impl/inc/dndMnode.h +++ b/source/dnode/mgmt/impl/inc/dndMnode.h @@ -34,7 +34,6 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonGrantInfo *pGrantInfo); -int8_t dndIsMnode(SDnode *pDnode); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index ff9eb5d884..2a231f257d 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -491,18 +491,20 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { pInfo->disk_total = tsDataSpace.size.total; taosGetBandSpeed(&pInfo->net_in, &pInfo->net_out); taosGetIOSpeed(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); - pInfo->req_select = 0; - pInfo->req_select_rate = 0; - pInfo->req_insert = 0; - pInfo->req_insert_success = 0; - pInfo->req_insert_rate = 0; - pInfo->req_insert_batch = 0; - pInfo->req_insert_batch_success = 0; - pInfo->req_insert_batch_rate = 0; + + SVnodesStat *pStat = &pDnode->vmgmt.stat; + pInfo->req_select = pStat->numOfSelectReqs; + pInfo->req_select_rate = pStat->speedOfSelectReqs; + pInfo->req_insert = pStat->numOfInsertReqs; + pInfo->req_insert_success = pStat->numOfInsertSuccessReqs; + pInfo->req_insert_rate = pStat->speedOfInsertReqs; + pInfo->req_insert_batch = pStat->numOfBatchInsertReqs; + pInfo->req_insert_batch_success = pStat->numOfBatchInsertSuccessReqs; + pInfo->req_insert_batch_rate = pStat->speedOfBatchInsertReqs; pInfo->errors = tsNumOfErrorLogs; - pInfo->vnodes_num = pDnode->vmgmt.totalVnodes; - pInfo->masters = pDnode->vmgmt.masterNum; - pInfo->has_mnode = dndIsMnode(pDnode); + pInfo->vnodes_num = pStat->totalVnodes; + pInfo->masters = pStat->masterNum; + pInfo->has_mnode = pDnode->mmgmt.deployed; } static void dndSendMonitorReport(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 47e74b5c57..0ea47c89d8 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -640,10 +640,3 @@ int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SM dndReleaseMnode(pDnode, pMnode); return code; } - -int8_t dndIsMnode(SDnode *pDnode) { - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode == NULL) return 0; - dndReleaseMnode(pDnode, pMnode); - return 1; -} \ No newline at end of file diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 28bc615aba..809f217571 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -382,7 +382,7 @@ static void *dnodeOpenVnodeFunc(void *param) { char stepDesc[TSDB_STEP_DESC_LEN] = {0}; snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId, - pMgmt->openVnodes, pMgmt->totalVnodes); + pMgmt->stat.openVnodes, pMgmt->stat.totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; @@ -396,7 +396,7 @@ static void *dnodeOpenVnodeFunc(void *param) { pThread->opened++; } - atomic_add_fetch_32(&pMgmt->openVnodes, 1); + atomic_add_fetch_32(&pMgmt->stat.openVnodes, 1); } dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, @@ -422,7 +422,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { return -1; } - pMgmt->totalVnodes = numOfVnodes; + pMgmt->stat.totalVnodes = numOfVnodes; int32_t threadNum = tsNumOfCores; #if 1 @@ -470,11 +470,11 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { free(threads); free(pCfgs); - if (pMgmt->openVnodes != pMgmt->totalVnodes) { - dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes); + if (pMgmt->stat.openVnodes != pMgmt->stat.totalVnodes) { + dError("there are total vnodes:%d, opened:%d", pMgmt->stat.totalVnodes, pMgmt->stat.openVnodes); return -1; } else { - dInfo("total vnodes:%d open successfully", pMgmt->totalVnodes); + dInfo("total vnodes:%d open successfully", pMgmt->stat.totalVnodes); return 0; } } @@ -982,10 +982,14 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; int32_t totalVnodes = 0; int32_t masterNum = 0; + int64_t numOfSelectReqs = 0; + int64_t numOfInsertReqs = 0; + int64_t numOfInsertSuccessReqs = 0; + int64_t numOfBatchInsertReqs = 0; + int64_t numOfBatchInsertSuccessReqs = 0; taosRLockLatch(&pMgmt->latch); - int32_t v = 0; void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; @@ -996,12 +1000,41 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { vnodeGetLoad(pVnode->pImpl, &vload); taosArrayPush(pLoads, &vload); + numOfSelectReqs += vload.numOfSelectReqs; + numOfInsertReqs += vload.numOfInsertReqs; + numOfInsertSuccessReqs += vload.numOfInsertSuccessReqs; + numOfBatchInsertReqs += vload.numOfBatchInsertReqs; + numOfBatchInsertSuccessReqs += vload.numOfBatchInsertSuccessReqs; totalVnodes++; if (vload.role == TAOS_SYNC_STATE_LEADER) masterNum++; + pIter = taosHashIterate(pMgmt->hash, pIter); } taosRUnLockLatch(&pMgmt->latch); - pMgmt->totalVnodes = totalVnodes; - pMgmt->masterNum = masterNum; + + SVnodesStat *pStat = &pMgmt->stat; + pStat->totalVnodes = totalVnodes; + pStat->masterNum = masterNum; + + int64_t curTime = taosGetTimestampMs(); + if (pStat->lastTime == 0 || pStat->lastTime >= curTime) { + pStat->lastTime = curTime; + pStat->numOfSelectReqs = numOfSelectReqs; + pStat->numOfInsertReqs = numOfInsertReqs; + pStat->numOfInsertSuccessReqs = numOfInsertSuccessReqs; + pStat->numOfBatchInsertReqs = numOfBatchInsertReqs; + pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; + return; + } + + double interval = (curTime - pStat->lastTime) * 1000.0; + pStat->speedOfSelectReqs = (numOfSelectReqs - pStat->numOfSelectReqs) / interval; + pStat->speedOfInsertReqs = (numOfInsertReqs - pStat->numOfInsertReqs) / interval; + pStat->speedOfBatchInsertReqs = (numOfBatchInsertReqs - pStat->numOfBatchInsertReqs) / interval; + pStat->numOfSelectReqs = numOfSelectReqs; + pStat->numOfInsertReqs = numOfInsertReqs; + pStat->numOfInsertSuccessReqs = numOfInsertSuccessReqs; + pStat->numOfBatchInsertReqs = numOfBatchInsertReqs; + pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; } diff --git a/source/dnode/vnode/src/vnd/vnodeInt.c b/source/dnode/vnode/src/vnd/vnodeInt.c index 6d3fa5f7f3..7d0b594e95 100644 --- a/source/dnode/vnode/src/vnd/vnodeInt.c +++ b/source/dnode/vnode/src/vnd/vnodeInt.c @@ -32,6 +32,11 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->totalStorage = 300; pLoad->compStorage = 200; pLoad->pointsWritten = 100; + pLoad->numOfSelectReqs = 1; + pLoad->numOfInsertReqs = 3; + pLoad->numOfInsertSuccessReqs = 2; + pLoad->numOfBatchInsertReqs = 5; + pLoad->numOfBatchInsertSuccessReqs = 4; return 0; } From c03f4abd795a1513910cd6ab4fa1b0cd1639be64 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 5 Mar 2022 16:09:55 +0800 Subject: [PATCH 2/3] monitor --- include/libs/monitor/monitor.h | 25 +++---- include/os/osSysinfo.h | 4 +- source/dnode/mgmt/impl/inc/dndEnv.h | 4 -- source/dnode/mgmt/impl/src/dndMgmt.c | 7 +- source/dnode/mgmt/impl/src/dndVnodes.c | 20 +----- source/libs/monitor/inc/monInt.h | 20 +++++- source/libs/monitor/src/monitor.c | 51 ++++++++++---- source/libs/monitor/test/monTest.cpp | 3 - source/os/src/osSysinfo.c | 94 ++------------------------ 9 files changed, 77 insertions(+), 151 deletions(-) diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 1695edd983..0c832f802b 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -95,20 +95,17 @@ typedef struct { int64_t disk_engine; // Byte int64_t disk_used; // Byte int64_t disk_total; // Byte - double net_in; // bytes per second - double net_out; // bytes per second - double io_read; - double io_write; - double io_read_disk; - double io_write_disk; - int32_t req_select; - float req_select_rate; - int32_t req_insert; - int32_t req_insert_success; - float req_insert_rate; - int32_t req_insert_batch; - int32_t req_insert_batch_success; - float req_insert_batch_rate; + int64_t net_in; // bytes + int64_t net_out; // bytes + int64_t io_read; // bytes + int64_t io_write; // bytes + int64_t io_read_disk; // bytes + int64_t io_write_disk; // bytes + int64_t req_select; + int64_t req_insert; + int64_t req_insert_success; + int64_t req_insert_batch; + int64_t req_insert_batch_success; int32_t errors; int32_t vnodes_num; int32_t masters; diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 9081fa9715..1ebad370b5 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -43,10 +43,8 @@ int32_t taosGetTotalMemory(int64_t *totalKB); int32_t taosGetProcMemory(int64_t *usedKB); int32_t taosGetSysMemory(int64_t *usedKB); int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize); -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes); -int32_t taosGetIOSpeed(double *readKB, double *writeKB, double *readDiskKB, double *writeDiskKB); +int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes); int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes); -int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec); int32_t taosSystem(const char *cmd); void taosKillSystem(); diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index ac72f8772b..13ef101908 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -109,14 +109,10 @@ typedef struct { int32_t totalVnodes; int32_t masterNum; int64_t numOfSelectReqs; - double speedOfSelectReqs; int64_t numOfInsertReqs; int64_t numOfInsertSuccessReqs; - double speedOfInsertReqs; int64_t numOfBatchInsertReqs; int64_t numOfBatchInsertSuccessReqs; - double speedOfBatchInsertReqs; - int64_t lastTime; } SVnodesStat; typedef struct { diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 2a231f257d..1782b2aba0 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -489,18 +489,15 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { pInfo->disk_engine = 0; pInfo->disk_used = tsDataSpace.size.used; pInfo->disk_total = tsDataSpace.size.total; - taosGetBandSpeed(&pInfo->net_in, &pInfo->net_out); - taosGetIOSpeed(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); + taosGetCardInfo(&pInfo->net_in, &pInfo->net_out); + taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); SVnodesStat *pStat = &pDnode->vmgmt.stat; pInfo->req_select = pStat->numOfSelectReqs; - pInfo->req_select_rate = pStat->speedOfSelectReqs; pInfo->req_insert = pStat->numOfInsertReqs; pInfo->req_insert_success = pStat->numOfInsertSuccessReqs; - pInfo->req_insert_rate = pStat->speedOfInsertReqs; pInfo->req_insert_batch = pStat->numOfBatchInsertReqs; pInfo->req_insert_batch_success = pStat->numOfBatchInsertSuccessReqs; - pInfo->req_insert_batch_rate = pStat->speedOfBatchInsertReqs; pInfo->errors = tsNumOfErrorLogs; pInfo->vnodes_num = pStat->totalVnodes; pInfo->masters = pStat->masterNum; diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 809f217571..d311e1e417 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -980,6 +980,7 @@ void dndCleanupVnodes(SDnode *pDnode) { void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; + SVnodesStat *pStat = &pMgmt->stat; int32_t totalVnodes = 0; int32_t masterNum = 0; int64_t numOfSelectReqs = 0; @@ -990,7 +991,7 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { taosRLockLatch(&pMgmt->latch); - void *pIter = taosHashIterate(pMgmt->hash, NULL); + void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL || *ppVnode == NULL) continue; @@ -1013,25 +1014,8 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { taosRUnLockLatch(&pMgmt->latch); - SVnodesStat *pStat = &pMgmt->stat; pStat->totalVnodes = totalVnodes; pStat->masterNum = masterNum; - - int64_t curTime = taosGetTimestampMs(); - if (pStat->lastTime == 0 || pStat->lastTime >= curTime) { - pStat->lastTime = curTime; - pStat->numOfSelectReqs = numOfSelectReqs; - pStat->numOfInsertReqs = numOfInsertReqs; - pStat->numOfInsertSuccessReqs = numOfInsertSuccessReqs; - pStat->numOfBatchInsertReqs = numOfBatchInsertReqs; - pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; - return; - } - - double interval = (curTime - pStat->lastTime) * 1000.0; - pStat->speedOfSelectReqs = (numOfSelectReqs - pStat->numOfSelectReqs) / interval; - pStat->speedOfInsertReqs = (numOfInsertReqs - pStat->numOfInsertReqs) / interval; - pStat->speedOfBatchInsertReqs = (numOfBatchInsertReqs - pStat->numOfBatchInsertReqs) / interval; pStat->numOfSelectReqs = numOfSelectReqs; pStat->numOfInsertReqs = numOfInsertReqs; pStat->numOfInsertSuccessReqs = numOfInsertSuccessReqs; diff --git a/source/libs/monitor/inc/monInt.h b/source/libs/monitor/inc/monInt.h index 65e47ef90c..bfb73af034 100644 --- a/source/libs/monitor/inc/monInt.h +++ b/source/libs/monitor/inc/monInt.h @@ -28,9 +28,24 @@ typedef struct { char content[MON_LOG_LEN]; } SMonLogItem; +typedef struct { + int64_t time; + int64_t req_select; + int64_t req_insert; + int64_t req_insert_batch; + int64_t net_in; + int64_t net_out; + int64_t io_read; + int64_t io_write; + int64_t io_read_disk; + int64_t io_write_disk; +} SMonState; + typedef struct SMonInfo { - SArray *logs; // array of SMonLogItem - SJson *pJson; + int64_t curTime; + SMonState lastState; + SArray *logs; // array of SMonLogItem + SJson *pJson; } SMonInfo; typedef struct { @@ -39,6 +54,7 @@ typedef struct { int32_t maxLogs; const char *server; uint16_t port; + SMonState state; } SMonitor; #ifdef __cplusplus diff --git a/source/libs/monitor/src/monitor.c b/source/libs/monitor/src/monitor.c index 7057f9bcd2..354989a7a1 100644 --- a/source/libs/monitor/src/monitor.c +++ b/source/libs/monitor/src/monitor.c @@ -46,6 +46,7 @@ int32_t monInit(const SMonCfg *pCfg) { tsMonitor.server = pCfg->server; tsMonitor.port = pCfg->port; tsLogFp = monRecordLog; + tsMonitor.state.time = taosGetTimestampMs(); pthread_mutex_init(&tsMonitor.lock, NULL); return 0; } @@ -76,20 +77,23 @@ SMonInfo *monCreateMonitorInfo() { return NULL; } + pMonitor->curTime = taosGetTimestampMs(); + pMonitor->lastState = tsMonitor.state; return pMonitor; } void monCleanupMonitorInfo(SMonInfo *pMonitor) { + tsMonitor.state = pMonitor->lastState; + tsMonitor.state.time = pMonitor->curTime; taosArrayDestroy(pMonitor->logs); tjsonDelete(pMonitor->pJson); free(pMonitor); } void monSetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo) { - SJson *pJson = pMonitor->pJson; - int64_t ms = taosGetTimestampMs(); - char buf[40] = {0}; - taosFormatUtcTime(buf, sizeof(buf), ms, TSDB_TIME_PRECISION_MILLI); + SJson *pJson = pMonitor->pJson; + char buf[40] = {0}; + taosFormatUtcTime(buf, sizeof(buf), pMonitor->curTime, TSDB_TIME_PRECISION_MILLI); tjsonAddStringToObject(pJson, "ts", buf); tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id); @@ -203,6 +207,27 @@ void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) { return; } + SMonState *pLast = &pMonitor->lastState; + double interval = (pMonitor->curTime - pLast->time) / 1000.0; + double req_select_rate = (pInfo->req_select - pLast->req_select) / interval; + double req_insert_rate = (pInfo->req_insert - pLast->req_insert) / interval; + double req_insert_batch_rate = (pInfo->req_insert_batch - pLast->req_insert_batch) / interval; + double net_in_rate = (pInfo->net_in - pLast->net_in) / interval; + double net_out_rate = (pInfo->net_out - pLast->net_out) / interval; + double io_read_rate = (pInfo->io_read - pLast->io_read) / interval; + double io_write_rate = (pInfo->io_write - pLast->io_write) / interval; + double io_read_disk_rate = (pInfo->io_read_disk - pLast->io_read_disk) / interval; + double io_write_disk_rate = (pInfo->io_write_disk - pLast->io_write_disk) / interval; + pLast->req_select = pInfo->req_select; + pLast->req_insert = pInfo->req_insert; + pLast->req_insert_batch = pInfo->req_insert_batch; + pLast->net_in = pInfo->net_in; + pLast->net_out = pInfo->net_out; + pLast->io_read = pInfo->io_read; + pLast->io_write = pInfo->io_write; + pLast->io_read_disk = pInfo->io_read_disk; + pLast->io_write_disk = pInfo->io_write_disk; + tjsonAddDoubleToObject(pJson, "uptime", pInfo->uptime); tjsonAddDoubleToObject(pJson, "cpu_engine", pInfo->cpu_engine); tjsonAddDoubleToObject(pJson, "cpu_system", pInfo->cpu_system); @@ -213,20 +238,20 @@ void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) { tjsonAddDoubleToObject(pJson, "disk_engine", pInfo->disk_engine); tjsonAddDoubleToObject(pJson, "disk_used", pInfo->disk_used); tjsonAddDoubleToObject(pJson, "disk_total", pInfo->disk_total); - tjsonAddDoubleToObject(pJson, "net_in", pInfo->net_in); - tjsonAddDoubleToObject(pJson, "net_out", pInfo->net_out); - tjsonAddDoubleToObject(pJson, "io_read", pInfo->io_read); - tjsonAddDoubleToObject(pJson, "io_write", pInfo->io_write); - tjsonAddDoubleToObject(pJson, "io_read_disk", pInfo->io_read_disk); - tjsonAddDoubleToObject(pJson, "io_write_disk", pInfo->io_write_disk); + tjsonAddDoubleToObject(pJson, "net_in", net_in_rate); + tjsonAddDoubleToObject(pJson, "net_out", net_out_rate); + tjsonAddDoubleToObject(pJson, "io_read", io_read_rate); + tjsonAddDoubleToObject(pJson, "io_write", io_write_rate); + tjsonAddDoubleToObject(pJson, "io_read_disk", io_read_disk_rate); + tjsonAddDoubleToObject(pJson, "io_write_disk", io_write_disk_rate); tjsonAddDoubleToObject(pJson, "req_select", pInfo->req_select); - tjsonAddDoubleToObject(pJson, "req_select_rate", pInfo->req_select_rate); + tjsonAddDoubleToObject(pJson, "req_select_rate", req_select_rate); tjsonAddDoubleToObject(pJson, "req_insert", pInfo->req_insert); tjsonAddDoubleToObject(pJson, "req_insert_success", pInfo->req_insert_success); - tjsonAddDoubleToObject(pJson, "req_insert_rate", pInfo->req_insert_rate); + tjsonAddDoubleToObject(pJson, "req_insert_rate", req_insert_rate); tjsonAddDoubleToObject(pJson, "req_insert_batch", pInfo->req_insert_batch); tjsonAddDoubleToObject(pJson, "req_insert_batch_success", pInfo->req_insert_batch_success); - tjsonAddDoubleToObject(pJson, "req_insert_batch_rate", pInfo->req_insert_batch_rate); + tjsonAddDoubleToObject(pJson, "req_insert_batch_rate", req_insert_batch_rate); tjsonAddDoubleToObject(pJson, "errors", pInfo->errors); tjsonAddDoubleToObject(pJson, "vnodes_num", pInfo->vnodes_num); tjsonAddDoubleToObject(pJson, "masters", pInfo->masters); diff --git a/source/libs/monitor/test/monTest.cpp b/source/libs/monitor/test/monTest.cpp index 3eaab45e3e..a6486b992e 100644 --- a/source/libs/monitor/test/monTest.cpp +++ b/source/libs/monitor/test/monTest.cpp @@ -142,13 +142,10 @@ void MonitorTest::GetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) { pInfo->io_read_disk = 7.1; pInfo->io_write_disk = 7.2; pInfo->req_select = 8; - pInfo->req_select_rate = 8.1; pInfo->req_insert = 9; pInfo->req_insert_success = 10; - pInfo->req_insert_rate = 10.1; pInfo->req_insert_batch = 11; pInfo->req_insert_batch_success = 12; - pInfo->req_insert_batch_rate = 12.3; pInfo->errors = 4; pInfo->vnodes_num = 5; pInfo->masters = 6; diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 8f2820f67a..ff9d5fb71b 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -118,7 +118,7 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { return 0; } -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { +int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { IO_COUNTERS io_counter; if (GetProcessIoCounters(GetCurrentProcess(), &io_counter)) { if (rchars) *rchars = io_counter.ReadTransferCount; @@ -135,9 +135,7 @@ void taosGetSystemInfo() { taosGetTotalMemory(&tsTotalMemoryKB); double tmp1, tmp2, tmp3, tmp4; - taosGetBandSpeed(&tmp1, &tmp2); taosGetCpuUsage(&tmp1, &tmp2); - taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); } void taosKillSystem() { @@ -227,7 +225,7 @@ void taosGetSystemInfo() { tsNumOfCores = sysconf(_SC_NPROCESSORS_ONLN); } -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { +int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { if (rchars) *rchars = 0; if (wchars) *wchars = 0; if (read_bytes) *read_bytes = 0; @@ -336,7 +334,7 @@ static char tsProcCpuFile[25] = {0}; static char tsProcMemFile[25] = {0}; static char tsProcIOFile[25] = {0}; -static void taosGetProcInfos() { +static void taosGetProcIOnfos() { tsPageSizeKB = sysconf(_SC_PAGESIZE) / 1024; tsOpenMax = sysconf(_SC_OPEN_MAX); tsStreamMax = sysconf(_SC_STREAM_MAX); @@ -544,41 +542,7 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { return 0; } -int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec) { - static int64_t last_receive_bytes = 0; - static int64_t last_transmit_bytes = 0; - static int64_t last_time = 0; - int64_t cur_receive_bytes = 0; - int64_t cur_transmit_bytes = 0; - int64_t cur_time = taosGetTimestampMs(); - - if (taosGetCardInfo(&cur_receive_bytes, &cur_transmit_bytes) != 0) { - return -1; - } - - if (last_time == 0 || last_time >= cur_time) { - last_time = cur_time; - last_receive_bytes = cur_receive_bytes; - last_transmit_bytes = cur_transmit_bytes; - *receive_bytes_per_sec = 0; - *transmit_bytes_per_sec = 0; - return 0; - } - - *receive_bytes_per_sec = (cur_receive_bytes - last_receive_bytes) / (double)(cur_time - last_time) * 1000; - *transmit_bytes_per_sec = (cur_transmit_bytes - last_transmit_bytes) / (double)(cur_time - last_time) * 1000; - - last_time = cur_time; - last_transmit_bytes = cur_transmit_bytes; - last_receive_bytes = cur_receive_bytes; - - if (*receive_bytes_per_sec < 0) *receive_bytes_per_sec = 0; - if (*transmit_bytes_per_sec < 0) *transmit_bytes_per_sec = 0; - - return 0; -} - -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { +int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { TdFilePtr pFile = taosOpenFile(tsProcIOFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) return -1; @@ -620,61 +584,13 @@ int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, in return 0; } -int32_t taosGetIOSpeed(double *rchar_per_sec, double *wchar_per_sec, double *read_bytes_per_sec, - double *write_bytes_per_sec) { - static int64_t last_rchar = -1; - static int64_t last_wchar = -1; - static int64_t last_read_bytes = -1; - static int64_t last_write_bytes = -1; - static int64_t last_time = 0; - - int64_t cur_rchar = 0; - int64_t cur_wchar = 0; - int64_t cur_read_bytes = 0; - int64_t cur_write_bytes = 0; - int64_t cur_time = taosGetTimestampMs(); - - if (taosReadProcIO(&cur_rchar, &cur_wchar, &cur_read_bytes, &cur_write_bytes) != 0) { - return -1; - } - - if (last_time == 0 || last_time >= cur_time) { - last_time = cur_time; - last_rchar = cur_rchar; - last_wchar = cur_wchar; - last_read_bytes = cur_read_bytes; - last_write_bytes = cur_write_bytes; - return -1; - } - - *rchar_per_sec = (cur_rchar - last_rchar) / (double)(cur_time - last_time) * 1000; - *wchar_per_sec = (cur_wchar - last_wchar) / (double)(cur_time - last_time) * 1000; - *read_bytes_per_sec = (cur_read_bytes - last_read_bytes) / (double)(cur_time - last_time) * 1000; - *write_bytes_per_sec = (cur_write_bytes - last_write_bytes) / (double)(cur_time - last_time) * 1000; - - last_time = cur_time; - last_rchar = cur_rchar; - last_wchar = cur_wchar; - last_read_bytes = cur_read_bytes; - last_write_bytes = cur_write_bytes; - - if (*rchar_per_sec < 0) *rchar_per_sec = 0; - if (*wchar_per_sec < 0) *wchar_per_sec = 0; - if (*read_bytes_per_sec < 0) *read_bytes_per_sec = 0; - if (*write_bytes_per_sec < 0) *write_bytes_per_sec = 0; - - return 0; -} - void taosGetSystemInfo() { - taosGetProcInfos(); + taosGetProcIOnfos(); taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); double tmp1, tmp2, tmp3, tmp4; - taosGetBandSpeed(&tmp1, &tmp2); taosGetCpuUsage(&tmp1, &tmp2); - taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); } void taosKillSystem() { From 13ecd2b73f1dad1eb9dabda842710e6387476a0a Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 5 Mar 2022 16:36:25 +0800 Subject: [PATCH 3/3] Feature/td 11463 3.0 (#10562) * Block-wise SMA extraction * refactor the SBlock * add method tsdbLoadBlockOffset * set method tsdbLoadBlockOffset static * refactor * trigger CI * minor change * trigger CI * add STSma defintion * add STSma schema encode/decode * restore * code optimization --- include/common/tmsg.h | 143 ++++++++++++++++++++++ include/util/tdef.h | 1 + source/dnode/vnode/CMakeLists.txt | 2 +- source/dnode/vnode/src/inc/tsdbCommit.h | 8 ++ source/dnode/vnode/src/inc/tsdbFS.h | 8 ++ source/dnode/vnode/src/inc/tsdbFile.h | 8 ++ source/dnode/vnode/src/inc/tsdbLog.h | 8 ++ source/dnode/vnode/src/inc/tsdbMemory.h | 7 ++ source/dnode/vnode/src/inc/tsdbReadImpl.h | 8 ++ source/dnode/vnode/src/meta/metaBDBImpl.c | 8 ++ source/dnode/vnode/src/vnd/vnodeWrite.c | 27 ++-- source/dnode/vnode/test/CMakeLists.txt | 55 ++++++--- source/dnode/vnode/test/tsdbSmaTest.cpp | 140 +++++++++++++++++++++ 13 files changed, 392 insertions(+), 31 deletions(-) create mode 100644 source/dnode/vnode/test/tsdbSmaTest.cpp diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 55279dbe55..134b9c8ec9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1842,6 +1842,149 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) } return buf; } +typedef enum { + TD_TIME_UNIT_UNKNOWN = -1, + TD_TIME_UNIT_YEAR = 0, + TD_TIME_UNIT_SEASON = 1, + TD_TIME_UNIT_MONTH = 2, + TD_TIME_UNIT_WEEK = 3, + TD_TIME_UNIT_DAY = 4, + TD_TIME_UNIT_HOUR = 5, + TD_TIME_UNIT_MINUTE = 6, + TD_TIME_UNIT_SEC = 7, + TD_TIME_UNIT_MILLISEC = 8, + TD_TIME_UNIT_MICROSEC = 9, + TD_TIME_UNIT_NANOSEC = 10 +} ETDTimeUnit; +typedef struct { + uint8_t version; // for compatibility + uint8_t intervalUnit; + uint8_t slidingUnit; + char indexName[TSDB_INDEX_NAME_LEN + 1]; + col_id_t numOfColIds; + uint16_t numOfFuncIds; + uint64_t tableUid; // super/common table uid + int64_t interval; + int64_t sliding; + col_id_t* colIds; // N.B. sorted column ids + uint16_t* funcIds; // N.B. sorted sma function ids +} STSma; // Time-range-wise SMA + +typedef struct { + uint32_t number; + STSma* tSma; +} STSmaWrapper; + +static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) { + if (pSma) { + tfree(pSma->colIds); + tfree(pSma->funcIds); + if (releaseSelf) { + free(pSma); + } + } +} + +static FORCE_INLINE void tdDestroyWrapper(STSmaWrapper* pSW) { + if (pSW && pSW->tSma) { + for (uint32_t i = 0; i < pSW->number; ++i) { + tdDestroyTSma(pSW->tSma + i, false); + } + tfree(pSW->tSma); + } +} + +static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) { + int32_t tlen = 0; + + tlen += taosEncodeFixedU8(buf, pSma->version); + tlen += taosEncodeFixedU8(buf, pSma->intervalUnit); + tlen += taosEncodeFixedU8(buf, pSma->slidingUnit); + tlen += taosEncodeString(buf, pSma->indexName); + tlen += taosEncodeFixedU16(buf, pSma->numOfColIds); + tlen += taosEncodeFixedU16(buf, pSma->numOfFuncIds); + tlen += taosEncodeFixedU64(buf, pSma->tableUid); + tlen += taosEncodeFixedI64(buf, pSma->interval); + tlen += taosEncodeFixedI64(buf, pSma->sliding); + + for (col_id_t i = 0; i < pSma->numOfColIds; ++i) { + tlen += taosEncodeFixedU16(buf, *(pSma->colIds + i)); + } + + for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) { + tlen += taosEncodeFixedU16(buf, *(pSma->funcIds + i)); + } + + return tlen; +} + +static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* pSW) { + int32_t tlen = 0; + + tlen += taosEncodeFixedU32(buf, pSW->number); + for (uint32_t i = 0; i < pSW->number; ++i) { + tlen += tEncodeTSma(buf, pSW->tSma + i); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) { + buf = taosDecodeFixedU8(buf, &pSma->version); + buf = taosDecodeFixedU8(buf, &pSma->intervalUnit); + buf = taosDecodeFixedU8(buf, &pSma->slidingUnit); + buf = taosDecodeStringTo(buf, pSma->indexName); + buf = taosDecodeFixedU16(buf, &pSma->numOfColIds); + buf = taosDecodeFixedU16(buf, &pSma->numOfFuncIds); + buf = taosDecodeFixedU64(buf, &pSma->tableUid); + buf = taosDecodeFixedI64(buf, &pSma->interval); + buf = taosDecodeFixedI64(buf, &pSma->sliding); + + if (pSma->numOfColIds > 0) { + pSma->colIds = (col_id_t*)calloc(pSma->numOfColIds, sizeof(STSma)); + if (pSma->colIds == NULL) { + return NULL; + } + for (uint16_t i = 0; i < pSma->numOfColIds; ++i) { + buf = taosDecodeFixedU16(buf, pSma->colIds + i); + } + } else { + pSma->colIds = NULL; + } + + if (pSma->numOfFuncIds > 0) { + pSma->funcIds = (uint16_t*)calloc(pSma->numOfFuncIds, sizeof(STSma)); + if (pSma->funcIds == NULL) { + return NULL; + } + for (uint16_t i = 0; i < pSma->numOfFuncIds; ++i) { + buf = taosDecodeFixedU16(buf, pSma->funcIds + i); + } + } else { + pSma->funcIds = NULL; + } + + return buf; +} + +static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) { + buf = taosDecodeFixedU32(buf, &pSW->number); + + pSW->tSma = (STSma*)calloc(pSW->number, sizeof(STSma)); + if (pSW->tSma == NULL) { + return NULL; + } + + for (uint32_t i = 0; i < pSW->number; ++i) { + if ((buf = tDecodeTSma(buf, pSW->tSma + i)) == NULL) { + for (uint32_t j = i; j >= 0; --i) { + tdDestroyTSma(pSW->tSma + j, false); + } + free(pSW->tSma); + return NULL; + } + } + return buf; +} typedef struct { int64_t uid; diff --git a/include/util/tdef.h b/include/util/tdef.h index a1b4fc11cf..9695c2e4c8 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -206,6 +206,7 @@ typedef enum ELogicConditionType { #define TSDB_FUNC_TYPE_AGGREGATE 2 #define TSDB_FUNC_MAX_RETRIEVE 1024 +#define TSDB_INDEX_NAME_LEN 32 #define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 3c0d45df63..f026f8331b 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -54,5 +54,5 @@ elseif(${META_DB_IMPL} STREQUAL "TDB") endif() if(${BUILD_TEST}) - # add_subdirectory(test) + add_subdirectory(test) endif(${BUILD_TEST}) diff --git a/source/dnode/vnode/src/inc/tsdbCommit.h b/source/dnode/vnode/src/inc/tsdbCommit.h index 9c35d06880..699aeaa133 100644 --- a/source/dnode/vnode/src/inc/tsdbCommit.h +++ b/source/dnode/vnode/src/inc/tsdbCommit.h @@ -16,6 +16,10 @@ #ifndef _TD_TSDB_COMMIT_H_ #define _TD_TSDB_COMMIT_H_ +#ifdef __cplusplus +extern "C" { +#endif + typedef struct { int minFid; int midFid; @@ -66,4 +70,8 @@ int tsdbApplyRtn(STsdbRepo *pRepo); #endif +#ifdef __cplusplus +} +#endif + #endif /* _TD_TSDB_COMMIT_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/tsdbFS.h b/source/dnode/vnode/src/inc/tsdbFS.h index dab697ce8b..71f35a9eca 100644 --- a/source/dnode/vnode/src/inc/tsdbFS.h +++ b/source/dnode/vnode/src/inc/tsdbFS.h @@ -18,6 +18,10 @@ #include "tsdbFile.h" +#ifdef __cplusplus +extern "C" { +#endif + // ================== TSDB global config extern bool tsdbForceKeepFile; @@ -111,4 +115,8 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { return 0; } +#ifdef __cplusplus +} +#endif + #endif /* _TD_TSDB_FS_H_ */ diff --git a/source/dnode/vnode/src/inc/tsdbFile.h b/source/dnode/vnode/src/inc/tsdbFile.h index bbeb6c6a0f..1034ae015a 100644 --- a/source/dnode/vnode/src/inc/tsdbFile.h +++ b/source/dnode/vnode/src/inc/tsdbFile.h @@ -19,6 +19,10 @@ #include "tchecksum.h" #include "tfs.h" +#ifdef __cplusplus +extern "C" { +#endif + #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF @@ -410,4 +414,8 @@ static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) { return true; } +#ifdef __cplusplus +} +#endif + #endif /* _TS_TSDB_FILE_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/tsdbLog.h b/source/dnode/vnode/src/inc/tsdbLog.h index 6ab17ec587..56dc8ab2a0 100644 --- a/source/dnode/vnode/src/inc/tsdbLog.h +++ b/source/dnode/vnode/src/inc/tsdbLog.h @@ -18,6 +18,10 @@ #include "tlog.h" +#ifdef __cplusplus +extern "C" { +#endif + extern int32_t tsdbDebugFlag; #define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) @@ -27,4 +31,8 @@ extern int32_t tsdbDebugFlag; #define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#ifdef __cplusplus +} +#endif + #endif /* _TD_TSDB_LOG_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/tsdbMemory.h b/source/dnode/vnode/src/inc/tsdbMemory.h index 1fc4cd9e52..df4df0053f 100644 --- a/source/dnode/vnode/src/inc/tsdbMemory.h +++ b/source/dnode/vnode/src/inc/tsdbMemory.h @@ -16,6 +16,10 @@ #ifndef _TD_TSDB_MEMORY_H_ #define _TD_TSDB_MEMORY_H_ +#ifdef __cplusplus +extern "C" { +#endif + static void * taosTMalloc(size_t size); static void * taosTCalloc(size_t nmemb, size_t size); static void * taosTRealloc(void *ptr, size_t size); @@ -70,5 +74,8 @@ static FORCE_INLINE void* taosTZfree(void* ptr) { return NULL; } +#ifdef __cplusplus +} +#endif #endif /* _TD_TSDB_MEMORY_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/tsdbReadImpl.h b/source/dnode/vnode/src/inc/tsdbReadImpl.h index 16eb55967c..cd24358b27 100644 --- a/source/dnode/vnode/src/inc/tsdbReadImpl.h +++ b/source/dnode/vnode/src/inc/tsdbReadImpl.h @@ -24,6 +24,10 @@ #include "tsdbMemory.h" #include "tcommon.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct SReadH SReadH; typedef struct { @@ -244,4 +248,8 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { return 0; } +#ifdef __cplusplus +} +#endif + #endif /*_TD_TSDB_READ_IMPL_H_*/ diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index f75d9d3db0..82b4d3ce12 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -38,6 +38,8 @@ struct SMetaDB { // DB DB *pTbDB; DB *pSchemaDB; + DB *pSmaDB; + // IDX DB *pNameIdx; DB *pStbIdx; @@ -100,6 +102,11 @@ int metaOpenDB(SMeta *pMeta) { return -1; } + if (metaOpenBDBDb(&(pDB->pSmaDB), pDB->pEvn, "sma.db", false) < 0) { + metaCloseDB(pMeta); + return -1; + } + // Open Indices if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "name.index", pDB->pTbDB, &metaNameIdxCb, false) < 0) { metaCloseDB(pMeta); @@ -130,6 +137,7 @@ void metaCloseDB(SMeta *pMeta) { metaCloseBDBIdx(pMeta->pDB->pNtbIdx); metaCloseBDBIdx(pMeta->pDB->pStbIdx); metaCloseBDBIdx(pMeta->pDB->pNameIdx); + metaCloseBDBDb(pMeta->pDB->pSmaDB); metaCloseBDBDb(pMeta->pDB->pSchemaDB); metaCloseBDBDb(pMeta->pDB->pTbDB); metaCloseBDBEnv(pMeta->pDB->pEvn); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 81eb09f48f..a2616307ff 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -41,10 +41,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { return 0; } -int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - SVCreateTbReq vCreateTbReq; - SVCreateTbBatchReq vCreateTbBatchReq; - void *ptr = NULL; +int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + void *ptr = NULL; if (pVnode->config.streamMode == 0) { ptr = vnodeMalloc(pVnode, pMsg->contLen); @@ -64,7 +62,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } switch (pMsg->msgType) { - case TDMT_VND_CREATE_STB: + case TDMT_VND_CREATE_STB: { + SVCreateTbReq vCreateTbReq = {0}; tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { // TODO: handle error @@ -75,7 +74,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { free(vCreateTbReq.stbCfg.pTagSchema); free(vCreateTbReq.name); break; - case TDMT_VND_CREATE_TABLE: + } + case TDMT_VND_CREATE_TABLE: { + SVCreateTbBatchReq vCreateTbBatchReq = {0}; tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq); for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) { SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); @@ -97,14 +98,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); taosArrayDestroy(vCreateTbBatchReq.pArray); break; - - case TDMT_VND_ALTER_STB: + } + case TDMT_VND_ALTER_STB: { + SVCreateTbReq vAlterTbReq = {0}; vTrace("vgId:%d, process alter stb req", pVnode->vgId); - tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); - free(vCreateTbReq.stbCfg.pSchema); - free(vCreateTbReq.stbCfg.pTagSchema); - free(vCreateTbReq.name); + tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq); + free(vAlterTbReq.stbCfg.pSchema); + free(vAlterTbReq.stbCfg.pTagSchema); + free(vAlterTbReq.name); break; + } case TDMT_VND_DROP_STB: vTrace("vgId:%d, process drop stb req", pVnode->vgId); break; diff --git a/source/dnode/vnode/test/CMakeLists.txt b/source/dnode/vnode/test/CMakeLists.txt index 6b468497d5..af123a3133 100644 --- a/source/dnode/vnode/test/CMakeLists.txt +++ b/source/dnode/vnode/test/CMakeLists.txt @@ -1,20 +1,39 @@ -add_executable(tqTest "") -target_sources(tqTest - PRIVATE - "tqMetaTest.cpp" -) -target_include_directories(tqTest - PUBLIC - "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +MESSAGE(STATUS "vnode unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +# add_executable(tqTest "") +# target_sources(tqTest +# PRIVATE +# "tqMetaTest.cpp" +# ) +# target_include_directories(tqTest +# PUBLIC +# "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" +# "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +# ) + +# target_link_libraries(tqTest +# tq +# gtest_main +# ) +# enable_testing() +# add_test( +# NAME tq_test +# COMMAND tqTest +# ) + +ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp) +TARGET_LINK_LIBRARIES( + tsdbSmaTest + PUBLIC os util common vnode gtest_main ) -target_link_libraries(tqTest - tq - gtest_main -) -enable_testing() -add_test( - NAME tq_test - COMMAND tqTest -) +TARGET_INCLUDE_DIRECTORIES( + tsdbSmaTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/common" + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../src/inc" + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) \ No newline at end of file diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp new file mode 100644 index 0000000000..f5aa82cb2d --- /dev/null +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include + +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +TEST(testCase, tSmaEncodeDecodeTest) { + // encode + STSma tSma = {0}; + tSma.version = 0; + tSma.intervalUnit = TD_TIME_UNIT_DAY; + tSma.interval = 1; + tSma.slidingUnit = TD_TIME_UNIT_HOUR; + tSma.sliding = 0; + tstrncpy(tSma.indexName, "sma_index_test", TSDB_INDEX_NAME_LEN); + tSma.tableUid = 1234567890; + tSma.numOfColIds = 2; + tSma.numOfFuncIds = 5; // sum/min/max/avg/last + tSma.colIds = (col_id_t *)calloc(tSma.numOfColIds, sizeof(col_id_t)); + tSma.funcIds = (uint16_t *)calloc(tSma.numOfFuncIds, sizeof(uint16_t)); + + for (int32_t i = 0; i < tSma.numOfColIds; ++i) { + *(tSma.colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID); + } + for (int32_t i = 0; i < tSma.numOfFuncIds; ++i) { + *(tSma.funcIds + i) = (i + 2); + } + + STSmaWrapper tSmaWrapper = {.number = 1, .tSma = &tSma}; + uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper); + + void *buf = calloc(bufLen, 1); + assert(buf != NULL); + + STSmaWrapper *pSW = (STSmaWrapper *)buf; + uint32_t len = tEncodeTSmaWrapper(&buf, &tSmaWrapper); + + EXPECT_EQ(len, bufLen); + + // decode + STSmaWrapper dstTSmaWrapper = {0}; + void * result = tDecodeTSmaWrapper(pSW, &dstTSmaWrapper); + assert(result != NULL); + + EXPECT_EQ(tSmaWrapper.number, dstTSmaWrapper.number); + + for (int i = 0; i < tSmaWrapper.number; ++i) { + STSma *pSma = tSmaWrapper.tSma + i; + STSma *qSma = dstTSmaWrapper.tSma + i; + + EXPECT_EQ(pSma->version, qSma->version); + EXPECT_EQ(pSma->intervalUnit, qSma->intervalUnit); + EXPECT_EQ(pSma->slidingUnit, qSma->slidingUnit); + EXPECT_STRCASEEQ(pSma->indexName, qSma->indexName); + EXPECT_EQ(pSma->numOfColIds, qSma->numOfColIds); + EXPECT_EQ(pSma->numOfFuncIds, qSma->numOfFuncIds); + EXPECT_EQ(pSma->tableUid, qSma->tableUid); + EXPECT_EQ(pSma->interval, qSma->interval); + EXPECT_EQ(pSma->sliding, qSma->sliding); + for (uint32_t j = 0; j < pSma->numOfColIds; ++j) { + EXPECT_EQ(*(col_id_t *)(pSma->colIds + j), *(col_id_t *)(qSma->colIds + j)); + } + for (uint32_t j = 0; j < pSma->numOfFuncIds; ++j) { + EXPECT_EQ(*(uint16_t *)(pSma->funcIds + j), *(uint16_t *)(qSma->funcIds + j)); + } + } + + // resource release + tdDestroyTSma(&tSma, false); + tdDestroyWrapper(&dstTSmaWrapper); +} + +#if 0 +TEST(testCase, tSmaInsertTest) { + STSma tSma = {0}; + STSmaData* pSmaData = NULL; + STsdb tsdb = {0}; + + // init + tSma.intervalUnit = TD_TIME_UNIT_DAY; + tSma.interval = 1; + tSma.numOfFuncIds = 5; // sum/min/max/avg/last + + int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t); + int32_t numOfColIds = 3; + int32_t numOfSmaBlocks = 10; + + int32_t dataLen = numOfColIds * numOfSmaBlocks * blockSize; + + pSmaData = (STSmaData*)malloc(sizeof(STSmaData) + dataLen); + ASSERT_EQ(pSmaData != NULL, true); + pSmaData->tableUid = 3232329230; + pSmaData->numOfColIds = numOfColIds; + pSmaData->numOfSmaBlocks = numOfSmaBlocks; + pSmaData->dataLen = dataLen; + pSmaData->tsWindow.skey = 1640000000; + pSmaData->tsWindow.ekey = 1645788649; + pSmaData->colIds = (col_id_t*)malloc(sizeof(col_id_t) * numOfColIds); + ASSERT_EQ(pSmaData->colIds != NULL, true); + + for (int32_t i = 0; i < numOfColIds; ++i) { + *(pSmaData->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID); + } + + // execute + EXPECT_EQ(tsdbInsertTSmaData(&tsdb, &tSma, pSmaData), TSDB_CODE_SUCCESS); + + // release + tdDestroySmaData(pSmaData); +} +#endif + +#pragma GCC diagnostic pop \ No newline at end of file