Merge branch 'main' into fix/analysis

This commit is contained in:
Haojun Liao 2025-02-14 15:37:11 +08:00
commit 73ec4af4a7
16 changed files with 353 additions and 585 deletions

View File

@ -69,6 +69,8 @@ TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series
TDengine 目前可以在 Linux、 Windows、macOS 等平台上安装和运行。任何 OS 的应用也可以选择 taosAdapter 的 RESTful 接口连接服务端 taosd。CPU 支持 X64/ARM64后续会支持 MIPS64、Alpha64、ARM32、RISC-V 等 CPU 架构。目前不支持使用交叉编译器构建。
如果你想要编译 taosAdapter 或者 taosKeeper需要安装 Go 1.18 及以上版本。
## 3.1 Linux系统
<details>
@ -153,6 +155,10 @@ cmake .. -DBUILD_TOOLS=true -DBUILD_CONTRIB=true
make
```
如果你想要编译 taosAdapter需要添加 `-DBUILD_HTTP=false` 选项。
如果你想要编译 taosKeeper需要添加 `--DBUILD_KEEPER=true` 选项。
可以使用Jemalloc作为内存分配器而不是使用glibc:
```bash
@ -180,6 +186,10 @@ mkdir debug && cd debug
cmake .. && cmake --build .
```
如果你想要编译 taosAdapter需要添加 `-DBUILD_HTTP=false` 选项。
如果你想要编译 taosKeeper需要添加 `--DBUILD_KEEPER=true` 选项。
</details>
## 4.3 Windows系统上构建

View File

@ -82,6 +82,8 @@ For contributing/building/testing TDengine Connectors, please check the followin
At the moment, TDengine server supports running on Linux/Windows/MacOS systems. Any application can also choose the RESTful interface provided by taosAdapter to connect the taosd service. TDengine supports X64/ARM64 CPU, and it will support MIPS64, Alpha64, ARM32, RISC-V and other CPU architectures in the future. Right now we don't support build with cross-compiling environment.
If you want to compile taosAdapter or taosKeeper, you need to install Go 1.18 or above.
## 3.1 On Linux
<details>
@ -168,6 +170,10 @@ cmake .. -DBUILD_TOOLS=true -DBUILD_CONTRIB=true
make
```
If you want to compile taosAdapter, you need to add the `-DBUILD_HTTP=false` option.
If you want to compile taosKeeper, you need to add the `--DBUILD_KEEPER=true` option.
You can use Jemalloc as memory allocator instead of glibc:
```bash
@ -196,6 +202,10 @@ mkdir debug && cd debug
cmake .. && cmake --build .
```
If you want to compile taosAdapter, you need to add the `-DBUILD_HTTP=false` option.
If you want to compile taosKeeper, you need to add the `--DBUILD_KEEPER=true` option.
</details>
## 4.3 Build on Windows

View File

@ -490,7 +490,7 @@ TDengine 客户端驱动提供了应用编程所需要的全部 API并且在
- 支持版本:从 v3.3.4.3 版本开始引入
#### bypassFlag
- 说明:配置文件所在目录
- 说明:用于短路测试 `内部参数`
- 类型:整数;
- 取值范围0正常写入1写入消息在 taos 客户端发送 RPC 消息前返回2写入消息在 taosd 服务端收到 RPC 消息后返回4写入消息在 taosd 服务端写入内存缓存前返回8写入消息在 taosd 服务端数据落盘前返回
- 默认值0

View File

@ -37,6 +37,9 @@ database_option: {
| WAL_FSYNC_PERIOD value
| WAL_RETENTION_PERIOD value
| WAL_RETENTION_SIZE value
| COMPACT_INTERVAL value
| COMPACT_TIME_RANGE value
| COMPACT_TIME_OFFSET value
}
```
@ -81,6 +84,10 @@ database_option: {
- WAL_FSYNC_PERIOD当 WAL_LEVEL 参数设置为 2 时,用于设置落盘的周期。默认为 3000单位毫秒。最小为 0表示每次写入立即落盘最大为 180000即三分钟。
- WAL_RETENTION_PERIOD: 为了数据订阅消费,需要 WAL 日志文件额外保留的最大时长策略。WAL 日志清理,不受订阅客户端消费状态影响。单位为 s。默认为 3600表示在 WAL 保留最近 3600 秒的数据,请根据数据订阅的需要修改这个参数为适当值。
- WAL_RETENTION_SIZE为了数据订阅消费需要 WAL 日志文件额外保留的最大累计大小策略。单位为 KB。默认为 0表示累计大小无上限。
- COMPACT_INTERVAL自动 compact 触发周期(从 1970-01-01T00:00:00Z 开始切分的时间周期)。取值范围0 或 [10m, keep2]单位m分钟h小时d。不加时间单位默认单位为天默认值为 0即不触发自动 compact 功能。如果 db 中有未完成的 compact 任务,不重复下发 compact 任务。仅企业版 3.3.5.0 版本开始支持。
- COMPACT_TIME_RANGE自动 compact 任务触发的 compact 时间范围,取值范围:[-keep2, -duration]单位m分钟h小时d。不加时间单位时默认单位为天默认值为 [0, 0]。取默认值 [0, 0] 时,如果 COMPACT_INTERVAL 大于 0会按照 [-keep2, -duration] 下发自动 compact。因此要关闭自动 compact 功能,需要将 COMPACT_INTERVAL 设置为 0。仅企业版 3.3.5.0 版本开始支持。
- COMPACT_TIME_OFFSET自动 compact 任务触发的 compact 时间相对本地时间的偏移量。取值范围:[0,23],单位: h小时默认值为 0。以 UTC 0 时区为例,如果 COMPACT_INTERVAL 为 1d当 COMPACT_TIME_OFFSET 为 0 时,在每天 0 点下发自动 compact如果 COMPACT_TIME_OFFSET 为 2在每天 2 点下发自动 compact。仅企业版 3.3.5.0 版本开始支持。
-
### 创建数据库示例
@ -127,6 +134,9 @@ alter_database_option: {
| WAL_RETENTION_PERIOD value
| WAL_RETENTION_SIZE value
| MINROWS value
| COMPACT_INTERVAL value
| COMPACT_TIME_RANGE value
| COMPACT_TIME_OFFSET value
}
```

View File

@ -1016,15 +1016,15 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -396,7 +396,8 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
tqNotifyClose(pVnode->pImpl->pTq);
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);

View File

@ -1158,51 +1158,22 @@ int32_t extractStreamNodeList(SMnode *pMnode) {
}
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
bool ready = true;
int32_t code = 0;
if (mndStreamNodeIsUpdated(pMnode)) {
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
streamMutexLock(&execInfo.lock);
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
if (taosArrayGetSize(execInfo.pTaskList) != 0) {
streamMutexUnlock(&execInfo.lock);
mError("stream task node change checking done, no vgroups exist, but task list is not empty");
return TSDB_CODE_FAILED;
}
}
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
if (p == NULL) {
continue;
}
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
}
if (pEntry->status != TASK_STATUS__READY) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
ready = false;
break;
}
if (pEntry->hTaskId != 0) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
" exists, checkpoint not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
pEntry->hTaskId);
ready = false;
break;
code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
}
streamMutexUnlock(&execInfo.lock);
return ready ? 0 : -1;
return code;
}
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
@ -1216,7 +1187,22 @@ int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
continue;
}
if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
// -1 denote not ready now or never ready till now
if (pEntry->hTaskId != 0) {
mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
" exists, checkpoint not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
pEntry->hTaskId);
return -1;
}
if (pEntry->status != TASK_STATUS__READY) {
mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId,
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
return -1;
}
if (ts < pEntry->startTime) {
ts = pEntry->startTime;
taskId = pEntry->id.taskId;
}
@ -1249,11 +1235,11 @@ static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
if (lastReadyTs != -1) {
mInfo("not start checkpoint, stream:0x%"PRIx64" last ready ts:%"PRId64" ready duration:%"PRId64" less than threshold",
pStream->uid, lastReadyTs, now - lastReadyTs);
} else {
mInfo("not start checkpoint, stream:0x%"PRIx64" not ready now", pStream->uid);
mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64
"ms less than threshold",
pStream->uid, lastReadyTs, (now - lastReadyTs));
}
ready = false;
@ -1274,7 +1260,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
int32_t numOfCheckpointTrans = 0;
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
@ -1326,7 +1312,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
}
int32_t numOfQual = taosArrayGetSize(pList);
if (numOfCheckpointTrans > tsMaxConcurrentCheckpoint) {
if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {
mDebug(
"%d stream(s) checkpoint interval longer than %ds, ongoing checkpoint trans:%d reach maximum allowed:%d, new "
"checkpoint trans are not allowed, wait for 30s",
@ -2601,20 +2587,51 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId,
}
}
static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) {
int32_t alreadySend = taosArrayGetSize(pList);
for (int32_t i = 0; i < alreadySend; ++i) {
int32_t *taskId = taosArrayGet(pList, i);
if (taskId == NULL) {
continue;
}
for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
if ((pe != NULL) && (pe->req.taskId == *taskId)) {
taosArrayRemove(pInfo->pTaskList, k);
break;
}
}
}
return alreadySend;
}
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
int64_t now = taosGetTimestampMs();
bool allReady = true;
SArray *pNodeSnapshot = NULL;
int32_t maxAllowedTrans = 50;
int32_t numOfTrans = 0;
int32_t code = 0;
void *pIter = NULL;
SArray *pList = taosArrayInit(4, sizeof(int32_t));
if (pList == NULL) {
return terrno;
}
SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
if (pStreamList == NULL) {
taosArrayDestroy(pList);
return terrno;
}
mDebug("start to process consensus-checkpointId in tmr");
bool allReady = true;
SArray *pNodeSnapshot = NULL;
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
taosArrayDestroy(pNodeSnapshot);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");
@ -2623,28 +2640,30 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
if (!allReady) {
mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
taosArrayDestroy(pStreamList);
taosArrayDestroy(pList);
return 0;
}
streamMutexLock(&execInfo.lock);
void *pIter = NULL;
while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
int64_t streamId = -1;
int32_t num = taosArrayGetSize(pInfo->pTaskList);
SArray *pList = taosArrayInit(4, sizeof(int32_t));
if (pList == NULL) {
continue;
}
taosArrayClear(pList);
int64_t streamId = -1;
int32_t num = taosArrayGetSize(pInfo->pTaskList);
SStreamObj *pStream = NULL;
code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
if (pStream == NULL || code != 0) { // stream has been dropped already
mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
void *p = taosArrayPush(pStreamList, &pInfo->streamId);
taosArrayDestroy(pList);
if (p == NULL) {
mError("failed to record the missing stream id in concensus-stream list, streamId:%" PRId64
" code:%s, continue",
pInfo->streamId, tstrerror(terrno));
}
continue;
}
@ -2654,7 +2673,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
continue;
}
streamId = pe->req.streamId;
if (streamId == -1) {
streamId = pe->req.streamId;
}
int32_t existed = 0;
bool allSame = true;
@ -2665,7 +2686,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
break;
}
if (((now - pe->ts) >= 10 * 1000) || allSame) {
if (((now - pe->ts) >= 10 * 1000) && allSame) {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
pe->req.startTs, (now - pe->ts) / 1000.0);
if (chkId > pe->req.checkpointId) {
@ -2673,8 +2694,12 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
taosArrayDestroy(pStreamList);
mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
pe->req.checkpointId, chkId);
mndReleaseStream(pMnode, pStream);
taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
return TSDB_CODE_FAILED;
}
code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
@ -2684,7 +2709,6 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
if (p == NULL) {
mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
}
streamId = pe->req.streamId;
} else {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
pe->req.startTs, (now - pe->ts) / 1000.0);
@ -2693,38 +2717,27 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
mndReleaseStream(pMnode, pStream);
if (taosArrayGetSize(pList) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
int32_t *taskId = taosArrayGet(pList, i);
if (taskId == NULL) {
continue;
}
for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
if ((pe != NULL) && (pe->req.taskId == *taskId)) {
taosArrayRemove(pInfo->pTaskList, k);
break;
}
}
}
}
taosArrayDestroy(pList);
int32_t alreadySend = doCleanReqList(pList, pInfo);
// clear request stream item with empty task list
if (taosArrayGetSize(pInfo->pTaskList) == 0) {
mndClearConsensusRspEntry(pInfo);
if (streamId == -1) {
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pStreamList);
mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
return TSDB_CODE_FAILED;
mError("streamId is -1, streamId:%" PRIx64" in consensus-checkpointId hashMap, cont", pInfo->streamId);
}
void *p = taosArrayPush(pStreamList, &streamId);
if (p == NULL) {
mError("failed to put into stream list, stream:0x%" PRIx64, streamId);
mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId);
}
}
numOfTrans += alreadySend;
if (numOfTrans > maxAllowedTrans) {
mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend);
taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
break;
}
}
for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
@ -2739,7 +2752,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pStreamList);
mDebug("end to process consensus-checkpointId in tmr");
taosArrayDestroy(pList);
mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans);
return code;
}

View File

@ -814,17 +814,18 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
int64_t ts) {
char msg[128] = {0};
char msg[128] = {0};
STrans *pTrans = NULL;
SStreamTask *pTask = NULL;
snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
STrans *pTrans = NULL;
int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
if (pTrans == NULL || code != 0) {
return terrno;
}
STaskId id = {.streamId = pStream->uid, .taskId = taskId};
SStreamTask *pTask = NULL;
STaskId id = {.streamId = pStream->uid, .taskId = taskId};
code = mndGetStreamTask(&id, pStream, &pTask);
if (code) {
mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);

View File

@ -422,7 +422,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
reqs.nReqs = taosArrayGetSize(reqs.pArray);
code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s failed to send create table msg", id);
tqError("s-task:%s failed to send create table msg, code:%s", id, tstrerror(code));
}
_end:
@ -861,6 +861,8 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
int32_t vgId = TD_VID(pVnode);
int64_t suid = pTask->outputInfo.tbSink.stbUid;
const char* id = pTask->id.idStr;
int32_t timeout = 300; // 5min
int64_t start = taosGetTimestampSec();
while (pTableSinkInfo->uid == 0) {
if (streamTaskShouldStop(pTask)) {
@ -868,6 +870,12 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
return TSDB_CODE_STREAM_EXEC_CANCELLED;
}
int64_t waitingDuration = taosGetTimestampSec() - start;
if (waitingDuration > timeout) {
tqError("s-task:%s wait for table-creating:%s more than %dsec, failed", id, dstTableName, timeout);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
// wait for the table to be created
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);

View File

@ -947,20 +947,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY:
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY:
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER:
return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
case TDMT_VND_GET_STREAM_PROGRESS:
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
@ -987,6 +975,18 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn
return tqProcessTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_CHECK_RSP:
return tqProcessTaskCheckRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY:
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER:
return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;

View File

@ -38,7 +38,7 @@ extern "C" {
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
#define STREAM_TASK_QUEUE_CAPACITY 5120
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (10)
// clang-format off
#define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)

View File

@ -131,12 +131,12 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg);
if (code != 0) {
rpcFreeCont(buf);
return code;
stError("s-task:%s (child %d) failed to send retrieve req to task:0x%x (vgId:%d) QID:0x%" PRIx64 " code:%s",
pTask->id.idStr, pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId, tstrerror(code));
} else {
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId);
}
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId);
}
return code;

View File

@ -807,6 +807,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
return 0;
}
int64_t st = taosGetTimestampMs();
EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
if (ret == EXEC_AFTER_IDLE) {
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
@ -841,8 +843,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
continue;
}
int64_t st = taosGetTimestampMs();
// here only handle the data block sink operation
if (type == STREAM_INPUT__DATA_BLOCK) {
pTask->execInfo.sink.dataSize += blockSize;
@ -873,6 +873,13 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
if (code) {
return code;
}
double el = (taosGetTimestampMs() - st) / 1000.0;
if (el > 5.0) { // elapsed more than 5 sec, not occupy the CPU anymore
stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id);
streamTaskSetIdleInfo(pTask, 500);
return code;
}
}
}
}

View File

@ -190,7 +190,7 @@ system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791223004,5,2,3,1.1);
loop4:
sleep 1000
run tsim/stream/checkTaskStatus.sim
sql select * from streamt;

View File

@ -1,267 +1,123 @@
# TaosKeeper
<!-- omit in toc -->
# taosKeeper
taosKeeper 是 TDengine 各项监控指标的导出工具,通过简单的几项配置即可获取 TDengine 的运行状态。并且 taosKeeper 企业版支持多种收集器,可以方便进行监控数据的展示。
[![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/taosdata/TDengine/taoskeeper-ci-build.yml)](https://github.com/taosdata/TDengine/actions/workflows/taoskeeper-ci-build.yml)
![GitHub commit activity](https://img.shields.io/github/commit-activity/m/taosdata/TDengine)
![GitHub License](https://img.shields.io/github/license/taosdata/TDengine)
![GitHub Release](https://img.shields.io/github/v/release/taosdata/tdengine)
<br />
[![Twitter Follow](https://img.shields.io/twitter/follow/tdenginedb?label=TDengine&style=social)](https://twitter.com/tdenginedb)
[![YouTube Channel](https://img.shields.io/badge/Subscribe_@tdengine--white?logo=youtube&style=social)](https://www.youtube.com/@tdengine)
[![Discord Community](https://img.shields.io/badge/Join_Discord--white?logo=discord&style=social)](https://discord.com/invite/VZdSuUg4pS)
[![LinkedIn](https://img.shields.io/badge/Follow_LinkedIn--white?logo=linkedin&style=social)](https://www.linkedin.com/company/tdengine)
[![StackOverflow](https://img.shields.io/badge/Ask_StackOverflow--white?logo=stackoverflow&style=social&logoColor=orange)](https://stackoverflow.com/questions/tagged/tdengine)
taosKeeper 使用 TDengine RESTful 接口,所以不需要安装 TDengine 客户端即可使用。
简体中文 | [English](./README.md)
## 构建
<!-- omit in toc -->
## 目录
### 获取源码
- [1. 简介](#1-简介)
- [2. 文档](#2-文档)
- [3. 前置条件](#3-前置条件)
- [4. 构建](#4-构建)
- [5. 测试](#5-测试)
- [5.1 运行测试](#51-运行测试)
- [5.2 添加用例](#52-添加用例)
- [5.3 性能测试](#53-性能测试)
- [6. CI/CD](#6-cicd)
- [7. 提交 Issues](#7-提交-issues)
- [8. 提交 PR](#8-提交-pr)
- [9. 引用](#9-引用)
- [10. 许可证](#10-许可证)
从 GitHub 克隆源码:
## 1. 简介
```sh
git clone https://github.com/taosdata/TDengine
cd TDengine/tools/keeper
```
taosKeeper 是 TDengine 3.0 版本全新引入的监控指标导出工具,旨在方便用户对 TDengine 的运行状态和性能指标进行实时监控。只需进行简单配置TDengine 就能将自身的运行状态和各项指标等信息上报给 taosKeeper。taosKeeper 在接收到监控数据后,会利用 taosAdapter 提供的 RESTful 接口,将这些数据存储到 TDengine 中。
### 编译
taosKeeper 的一个重要价值在于,它能够将多个甚至一批 TDengine 集群的监控数据集中存储到一个统一的平台。如此一来,监控软件便能轻松获取这些数据,进而实现对 TDengine 集群的全面监控与实时分析。通过 taosKeeper用户可以更加便捷地了解 TDengine 的运行状况,及时发现并解决潜在问题,确保系统的稳定性和高效性。
taosKeeper 使用 `GO` 语言编写,在构建前需要配置好 `GO` 语言开发环境。
## 2. 文档
```sh
go mod tidy
- 使用 taosKeeper请参考 [taosKeeper 参考手册](https://docs.taosdata.com/reference/components/taoskeeper/),其中包括安装、配置、启动、数据收集与监控,以及集成 Prometheus 等方面的内容。
- 本 README 主要面向希望自行贡献代码、编译和测试 taosKeeper 的开发者。如果想要学习 TDengine可以浏览 [官方文档](https://docs.taosdata.com/)。
## 3. 前置条件
1. 已安装 Go 1.18 及以上版本。
2. 本地已部署 TDengine具体步骤请参考 [部署服务端](https://docs.taosdata.com/get-started/package/),且已启动 taosd 与 taosAdapter。
## 4. 构建
`TDengine/tools/keeper` 目录下运行以下命令以构建项目:
```bash
go build
```
## 安装
## 5. 测试
如果是自行构建的项目,仅需要拷贝 `taoskeeper` 文件到你的 `PATH` 中。
### 5.1 运行测试
```sh
sudo install taoskeeper /usr/bin/
`TDengine/tools/keeper` 目录下执行以下命令运行测试:
```bash
sudo go test ./...
```
## 启动
测试用例将连接到本地的 TDengine 服务器和 taosAdapter 进行测试。测试完成后,你将看到类似如下的结果摘要。如果所有测试用例均通过,输出中将不会出现 `FAIL` 字样。
在启动前,应该做好如下配置:
`/etc/taos/taoskeeper.toml` 配置 TDengine 连接参数以及监控指标前缀等其他信息。
```toml
# gin 框架是否启用 debug
debug = false
# 服务监听端口, 默认为 6043
port = 6043
# 日志级别,包含 panic、error、info、debug、trace等
loglevel = "info"
# 程序中使用协程池的大小
gopoolsize = 50000
# 查询 TDengine 监控数据轮询间隔
RotationInterval = "15s"
[tdengine]
host = "127.0.0.1"
port = 6041
username = "root"
password = "taosdata"
# 需要被监控的 taosAdapter
[taosAdapter]
address = ["127.0.0.1:6041"]
[metrics]
# 监控指标前缀
prefix = "taos"
# 存放监控数据的数据库
database = "log"
# 指定需要监控的普通表
tables = []
[environment]
# 是否在容器中运行,影响 taosKeeper 自身的监控数据
incgroup = false
```text
ok github.com/taosdata/taoskeeper/api 17.405s
ok github.com/taosdata/taoskeeper/cmd 1.819s
ok github.com/taosdata/taoskeeper/db 0.484s
ok github.com/taosdata/taoskeeper/infrastructure/config 0.417s
ok github.com/taosdata/taoskeeper/infrastructure/log 0.785s
ok github.com/taosdata/taoskeeper/monitor 4.623s
ok github.com/taosdata/taoskeeper/process 0.606s
ok github.com/taosdata/taoskeeper/system 3.420s
ok github.com/taosdata/taoskeeper/util 0.097s
ok github.com/taosdata/taoskeeper/util/pool 0.146s
```
现在可以启动服务,输入:
### 5.2 添加用例
```sh
taoskeeper
```
在以 `_test.go` 结尾的文件中添加测试用例,并且确保新增代码都有对应的测试用例覆盖。
如果你使用 `systemd`,复制 `taoskeeper.service``/lib/systemd/system/`,并启动服务。
### 5.3 性能测试
```sh
sudo cp taoskeeper.service /lib/systemd/system/
sudo systemctl daemon-reload
sudo systemctl start taoskeeper
```
性能测试正在开发中。
让 taosKeeper 随系统开机自启动。
## 6. CI/CD
```sh
sudo systemctl enable taoskeeper
```
- [Build Workflow](https://github.com/taosdata/TDengine/actions/workflows/taoskeeper-ci-build.yml)
- Code Coverage - TODO
如果使用 `systemd`,你可以使用如下命令完成安装。
## 7. 提交 Issues
```sh
go mod tidy
go build
sudo install taoskeeper /usr/bin/
sudo cp taoskeeper.service /lib/systemd/system/
sudo systemctl daemon-reload
sudo systemctl start taoskeeper
sudo systemctl enable taoskeeper
```
我们欢迎提交 [GitHub Issue](https://github.com/taosdata/TDengine/issues)。提交时请尽量提供以下信息,以便快速定位问题:
## Docker
- 问题描述:具体问题表现及是否必现,建议附上详细调用堆栈或日志信息。
- taosKeeper 版本:可通过 `taoskeeper -V` 获取版本信息。
- TDengine 服务端版本:可通过 `taos -V` 获取版本信息。
如下介绍了如何在 docker 中构建 taosKeeper
如有其它相关信息(如环境配置、操作系统版本等),请一并补充,以便我们更全面地了解问题。
在构建前请配置好 `./config/taoskeeper.toml` 中合适的参数,并编辑 Dockerfile ,示例如下。
## 8. 提交 PR
```dockerfile
FROM golang:1.18.6-alpine as builder
我们欢迎开发者共同参与本项目开发,提交 PR 时请按照以下步骤操作:
WORKDIR /usr/src/taoskeeper
COPY ./ /usr/src/taoskeeper/
ENV GO111MODULE=on \
GOPROXY=https://goproxy.cn,direct
RUN go mod tidy && go build
1. Fork 仓库:请先 Fork 本仓库,具体步骤请参考 [如何 Fork 仓库](https://docs.github.com/en/get-started/quickstart/fork-a-repo)。
2. 创建新分支:基于 `main` 分支创建一个新分支,并使用有意义的分支名称(例如:`git checkout -b feature/my_feature`)。请勿直接在 main 分支上进行修改。
3. 开发与测试:完成代码修改后,确保所有单元测试都能通过,并为新增功能或修复的 Bug 添加相应的测试用例。
4. 提交代码:将修改提交到远程分支(例如:`git push origin feature/my_feature`)。
5. 创建 Pull Request在 GitHub 上发起 [Pull Request](https://github.com/taosdata/TDengine/pulls),具体步骤请参考 [如何创建 Pull Request](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request)。
6. 检查 CI提交 PR 后,可在 Pull Request 中找到自己提交的 PR点击对应的链接即可查看该 PR 的 CI 是否通过。若通过,会显示 `All checks have passed`。无论 CI 是否通过,均可点击 `Show all checks -> Details` 查看详细的测试用例日志。
FROM alpine:3
RUN mkdir -p /etc/taos
COPY --from=builder /usr/src/taoskeeper/taoskeeper /usr/bin/
COPY ./config/taoskeeper.toml /etc/taos/taoskeeper.toml
EXPOSE 6043
CMD ["taoskeeper"]
```
## 9. 引用
如果已经有 taosKeeper 可执行文件,在配置好 `taoskeeper.toml` 后你可以使用如下方式构建:
[TDengine 官网](https://www.taosdata.com/)
```dockerfile
FROM ubuntu:18.04
RUN mkdir -p /etc/taos
COPY ./taoskeeper /usr/bin/
COPY ./taoskeeper.toml /etc/taos/taoskeeper.toml
EXPOSE 6043
CMD ["taoskeeper"]
```
## 10. 许可证
## 使用(**企业版**
### Prometheus (by scrape)
taosKeeper 可以像 `node-exporter` 一样向 Prometheus 提供监控指标。\
`/etc/prometheus/prometheus.yml` 添加配置:
```yml
global:
scrape_interval: 5s
scrape_configs:
- job_name: "taoskeeper"
static_configs:
- targets: ["taoskeeper:6043"]
```
现在使用 PromQL 查询即可以显示结果,比如要查看指定主机(通过 FQDN 正则匹配表达式筛选)硬盘使用百分比:
```promql
taos_dn_disk_used / taos_dn_disk_total {fqdn=~ "tdengine.*"}
```
你可以使用 `docker-compose` 测试完整的链路。
`docker-compose.yml`示例:
```yml
version: "3.7"
services:
tdengine:
image: tdengine/tdengine
environment:
TAOS_FQDN: tdengine
volumes:
- taosdata:/var/lib/taos
taoskeeper:
build: ./
depends_on:
- tdengine
environment:
TDENGINE_HOST: tdengine
TDENGINE_PORT: 6041
volumes:
- ./config/taoskeeper.toml:/etc/taos/taoskeeper.toml
ports:
- 6043:6043
prometheus:
image: prom/prometheus
volumes:
- ./prometheus/:/etc/prometheus/
ports:
- 9090:9090
volumes:
taosdata:
```
启动:
```sh
docker-compose up -d
```
现在通过访问 <http://localhost:9090> 来查询结果。访问[simple dashboard](https://grafana.com/grafana/dashboards/15164) 来查看TaosKeeper + Prometheus + Grafana 监控 TDengine 的快速启动实例。
### Telegraf
如果使用 telegraf 来收集各个指标,仅需要在配置中增加:
```toml
[[inputs.prometheus]]
## An array of urls to scrape metrics from.
urls = ["http://taoskeeper:6043/metrics"]
```
可以通过 `docker-compose` 来测试
```sh
docker-compose -f docker-compose.yml -f telegraf.yml up -d telegraf taoskeeper
```
由于可以在 `telegraf.conf` 设置日志为标准输出:
```toml
[[outputs.file]]
files = ["stdout"]
```
所以你可以通过 `docker-compose logs` 在标准输出中追踪 TDengine 各项指标。
```sh
docker-compose -f docker-compose.yml -f telegraf.yml logs -f telegraf
```
### Zabbix
1. 导入 zabbix 临时文件 `zbx_taos_keeper_templates.xml`
2. 使用 `TDengine` 模板来创建主机,修改宏 `{$TAOSKEEPER_HOST}``{$COLLECTION_INTERVAL}`
3. 等待并查看到自动创建的条目。
### 常见问题
* 启动报错显示connection refused
**解析**taosKeeper 依赖 restful 接口查询数据,请检查 taosAdapter 是否正常运行或 taoskeeper.toml 中 taosAdapter 地址是否正确。
* taosKeeper 监控不同 TDengine 显示的检测指标数目不一致?
**解析**:如果 TDengine 中未创建某项指标taoskeeper 不能获取对应的检测结果。
* 不能接收到 TDengine 的监控日志。
**解析**: 修改 `/etc/taos/taos.cfg` 文件并增加如下参数:
```cfg
monitor 1 // 启用monitor
monitorInterval 30 // 发送间隔 (s)
monitorFqdn localhost // 接收消息的FQDN默认为空
monitorPort 6043 // 接收消息的端口号
monitorMaxLogs 100 // 每个监控间隔缓存的最大日志数量
```
[AGPL-3.0 License](../../LICENSE)

View File

@ -1,273 +1,123 @@
# TaosKeeper
<!-- omit in toc -->
# taosKeeper
TDengine Metrics Exporter for Kinds of Collectors, you can obtain the running status of TDengine by performing several simple configurations.
[![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/taosdata/TDengine/taoskeeper-ci-build.yml)](https://github.com/taosdata/TDengine/actions/workflows/taoskeeper-ci-build.yml)
![GitHub commit activity](https://img.shields.io/github/commit-activity/m/taosdata/TDengine)
![GitHub License](https://img.shields.io/github/license/taosdata/TDengine)
![GitHub Release](https://img.shields.io/github/v/release/taosdata/tdengine)
<br />
[![Twitter Follow](https://img.shields.io/twitter/follow/tdenginedb?label=TDengine&style=social)](https://twitter.com/tdenginedb)
[![YouTube Channel](https://img.shields.io/badge/Subscribe_@tdengine--white?logo=youtube&style=social)](https://www.youtube.com/@tdengine)
[![Discord Community](https://img.shields.io/badge/Join_Discord--white?logo=discord&style=social)](https://discord.com/invite/VZdSuUg4pS)
[![LinkedIn](https://img.shields.io/badge/Follow_LinkedIn--white?logo=linkedin&style=social)](https://www.linkedin.com/company/tdengine)
[![StackOverflow](https://img.shields.io/badge/Ask_StackOverflow--white?logo=stackoverflow&style=social&logoColor=orange)](https://stackoverflow.com/questions/tagged/tdengine)
This tool uses TDengine RESTful API, so you could just build it without TDengine client.
English | [简体中文](./README-CN.md)
## Build
<!-- omit in toc -->
## Table of Contents
### Get the source codes
- [1. Introduction](#1-introduction)
- [2. Documentation](#2-documentation)
- [3. Prerequisites](#3-prerequisites)
- [4. Build](#4-build)
- [5. Testing](#5-testing)
- [5.1 Test Execution](#51-test-execution)
- [5.2 Test Case Addition](#52-test-case-addition)
- [5.3 Performance Testing](#53-performance-testing)
- [6. CI/CD](#6-cicd)
- [7. Submitting Issues](#7-submitting-issues)
- [8. Submitting PR](#8-submitting-pr)
- [9. References](#9-references)
- [10. License](#10-license)
```sh
git clone https://github.com/taosdata/TDengine
cd TDengine/tools/keeper
```
## 1. Introduction
### compile
taosKeeper is a new monitoring indicator export tool introduced in TDengine 3.0, which is designed to facilitate users to monitor the operating status and performance indicators of TDengine in real time. With simple configuration, TDengine can report its own operating status and various indicators to taosKeeper. After receiving the monitoring data, taosKeeper will use the RESTful interface provided by taosAdapter to store the data in TDengine.
```sh
go mod tidy
An important value of taosKeeper is that it can store the monitoring data of multiple or even a batch of TDengine clusters in a unified platform. In this way, the monitoring software can easily obtain this data, and then realize comprehensive monitoring and real-time analysis of the TDengine cluster. Through taosKeeper, users can more easily understand the operation status of TDengine, discover and solve potential problems in a timely manner, and ensure the stability and efficiency of the system.
## 2. Documentation
- To use taosKeeper, please refer to the [taosKeeper Reference](https://docs.tdengine.com/tdengine-reference/components/taoskeeper/), which includes installation, configuration, startup, data collection and monitoring, and Prometheus integration.
- This README is mainly for developers who want to contribute code, compile and test taosKeeper. If you want to learn TDengine, you can browse the [official documentation](https://docs.tdengine.com/).
## 3. Prerequisites
1. Go 1.18 or above has been installed.
2. TDengine has been deployed locally. For specific steps, please refer to [Deploy TDengine](https://docs.tdengine.com/get-started/deploy-from-package/), and taosd and taosAdapter have been started.
## 4. Build
Run the following command in the `TDengine/tools/keeper` directory to build the project:
```bash
go build
```
## Install
## 5. Testing
If you build the tool by your self, just copy the `taoskeeper` binary to your `PATH`.
### 5.1 Test Execution
```sh
sudo install taoskeeper /usr/bin/
Run the test by executing the following command in the `TDengine/tools/keeper` directory:
```bash
sudo go test ./...
```
## Start
The test case will connect to the local TDengine server and taosAdapter for testing. After the test is completed, you will see a result summary similar to the following. If all test cases pass, there will be no `FAIL` in the output.
Before start, you should configure some options like database ip, port or the prefix and others for exported metrics.
in `/etc/taos/taoskeeper.toml`.
```toml
# Start with debug middleware for gin
debug = false
# Listen port, default is 6043
port = 6043
# log level
loglevel = "info"
# go pool size
gopoolsize = 50000
# interval for TDengine metrics
RotationInterval = "15s"
[tdengine]
host = "127.0.0.1"
port = 6041
username = "root"
password = "taosdata"
# list of taosAdapter that need to be monitored
[taosAdapter]
address = ["127.0.0.1:6041"]
[metrics]
# metrics prefix in metrics names.
prefix = "taos"
# database for storing metrics data
database = "log"
# export some tables that are not super table
tables = []
[environment]
# Whether running in cgroup.
incgroup = false
```text
ok github.com/taosdata/taoskeeper/api 17.405s
ok github.com/taosdata/taoskeeper/cmd 1.819s
ok github.com/taosdata/taoskeeper/db 0.484s
ok github.com/taosdata/taoskeeper/infrastructure/config 0.417s
ok github.com/taosdata/taoskeeper/infrastructure/log 0.785s
ok github.com/taosdata/taoskeeper/monitor 4.623s
ok github.com/taosdata/taoskeeper/process 0.606s
ok github.com/taosdata/taoskeeper/system 3.420s
ok github.com/taosdata/taoskeeper/util 0.097s
ok github.com/taosdata/taoskeeper/util/pool 0.146s
```
Now you could run the tool:
### 5.2 Test Case Addition
```sh
taoskeeper
```
Add test cases in files ending with `_test.go` and make sure the new code is covered by the corresponding test cases.
If you use `systemd`, copy the `taoskeeper.service` to `/lib/systemd/system/` and start the service.
### 5.3 Performance Testing
```sh
sudo cp taoskeeper.service /lib/systemd/system/
sudo systemctl daemon-reload
sudo systemctl start taoskeeper
```
Performance testing is under development.
To start taoskeeper whenever os rebooted, you should enable the systemd service:
## 6. CI/CD
```sh
sudo systemctl enable taoskeeper
```
- [Build Workflow](https://github.com/taosdata/TDengine/actions/workflows/taoskeeper-ci-build.yml)
- Code Coverage - TODO
So if use `systemd`, you'd better install it with these lines all-in-one:
## 7. Submitting Issues
```sh
go mod tidy
go build
sudo install taoskeeper /usr/bin/
sudo cp taoskeeper.service /lib/systemd/system/
sudo systemctl daemon-reload
sudo systemctl start taoskeeper
sudo systemctl enable taoskeeper
```
We welcome submissions of [GitHub Issues](https://github.com/taosdata/TDengine/issues). Please provide the following information when submitting so that the problem can be quickly located:
## Docker
- Problem description: The specific problem manifestation and whether it must occur. It is recommended to attach detailed call stack or log information.
- taosKeeper version: You can get the version information through `taoskeeper -V`.
- TDengine server version: You can get the version information through `taos -V`.
Here is an example to show how to build this tool in docker:
If you have other relevant information (such as environment configuration, operating system version, etc.), please add it so that we can understand the problem more comprehensively.
Before building, you should configure `./config/taoskeeper.toml` with proper parameters and edit Dockerfile. Take following as example.
## 8. Submitting PR
```dockerfile
FROM golang:1.18.2 as builder
We welcome developers to participate in the development of this project. Please follow the steps below when submitting a PR:
WORKDIR /usr/src/taoskeeper
COPY ./ /usr/src/taoskeeper/
ENV GO111MODULE=on \
GOPROXY=https://goproxy.cn,direct
RUN go mod tidy && go build
1. Fork the repository: Please fork this repository first. For specific steps, please refer to [How to Fork a Repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo).
2. Create a new branch: Create a new branch based on the `main` branch and use a meaningful branch name (for example: `git checkout -b feature/my_feature`). Do not modify it directly on the main branch.
3. Development and testing: After completing the code modification, make sure that all unit tests pass, and add corresponding test cases for new features or fixed bugs.
4. Submit code: Submit the changes to the remote branch (for example: `git push origin feature/my_feature`).
5. Create a Pull Request: Initiate a [Pull Request](https://github.com/taosdata/TDengine/pulls) on GitHub. For specific steps, please refer to [How to Create a Pull Request](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request).
6. Check CI: After submitting the PR, you can find the PR you submitted in the Pull Request and click the corresponding link to check whether the CI of the PR has passed. If it has passed, it will show `All checks have passed`. Regardless of whether CI has passed or not, you can click `Show all checks/Details` to view detailed test case logs.
FROM alpine:3
RUN mkdir -p /etc/taos
COPY --from=builder /usr/src/taoskeeper/taoskeeper /usr/bin/
COPY ./config/taoskeeper.toml /etc/taos/taoskeeper.toml
EXPOSE 6043
CMD ["taoskeeper"]
```
## 9. References
If you already have taosKeeper binary file, you can build this tool like:
[TDengine Official Website](https://www.tdengine.com/)
```dockerfile
FROM ubuntu:18.04
RUN mkdir -p /etc/taos
COPY ./taoskeeper /usr/bin/
COPY ./taoskeeper.toml /etc/taos/taoskeeper.toml
EXPOSE 6043
CMD ["taoskeeper"]
```
## 10. License
## Usage (**Enterprise Edition**)
### Prometheus (by scrape)
It's now act as a prometheus exporter like `node-exporter`.
Here's how to add this in scrape configs of `/etc/prometheus/prometheus.yml`:
```yml
global:
scrape_interval: 5s
scrape_configs:
- job_name: "taoskeeper"
static_configs:
- targets: [ "taoskeeper:6043" ]
```
Now PromQL query will show the right result, for example, to show disk used percent in an specific host with FQDN regex
match expression:
```promql
taos_dn_disk_used / taos_dn_disk_total {fqdn=~ "tdengine.*"}
```
You can use `docker-compose` with the current `docker-compose.yml` to test the whole stack.
Here is the `docker-compose.yml`:
```yml
version: "3.7"
services:
tdengine:
image: tdengine/tdengine
environment:
TAOS_FQDN: tdengine
volumes:
- taosdata:/var/lib/taos
taoskeeper:
build: ./
depends_on:
- tdengine
environment:
TDENGINE_HOST: tdengine
TDENGINE_PORT: 6041
volumes:
- ./config/taoskeeper.toml:/etc/taos/taoskeeper.toml
ports:
- 6043:6043
prometheus:
image: prom/prometheus
volumes:
- ./prometheus/:/etc/prometheus/
ports:
- 9090:9090
volumes:
taosdata:
```
Start the stack:
```sh
docker-compose up -d
```
Now you point to <http://localhost:9090> (if you have not started a prometheus server by yourself) and query.
For a quick demo with TaosKeeper + Prometheus + Grafana, we provide
a [simple dashboard](https://grafana.com/grafana/dashboards/15164) to monitor TDengine.
### Telegraf
If you are using telegraf to collect metrics, just add inputs like this:
```toml
[[inputs.prometheus]]
## An array of urls to scrape metrics from.
urls = ["http://taoskeeper:6043/metrics"]
```
You can test it with `docker-compose`:
```sh
docker-compose -f docker-compose.yml -f telegraf.yml up -d telegraf taoskeeper
```
Since we have set an stdout file output in `telegraf.conf`:
```toml
[[outputs.file]]
files = ["stdout"]
```
So you can track with TDengine metrics in standard output with `docker-compose logs`:
```sh
docker-compose -f docker-compose.yml -f telegraf.yml logs -f telegraf
```
### Zabbix
1. Import the zabbix template file `zbx_taos_keeper_templates.xml`.
2. Use the template `TDengine` to create the host and modify the macros `{$TAOSKEEPER_HOST}`
and `{$COLLECTION_INTERVAL}`.
3. Waiting for monitoring items to be created automatically.
### FAQ
* Error occurred: Connection refused, while taosKeeper was starting
**Answer**: taoskeeper relies on restful interfaces to query data. Check whether the taosAdapter is running or whether
the taosAdapter address in taoskeeper.toml is correct.
* Why detection metrics displayed by different TDengine's inconsistent with taoskeeper monitoring?
**Answer**: If a metric is not created in TDengine, taoskeeper cannot get the corresponding test results.
* Cannot receive log from TDengine server.
**Answer**: Modify `/etc/taos/taos.cfg` file and add parameters like:
```cfg
monitor 1 // start monitor
monitorInterval 30 // send log interval (s)
monitorFqdn localhost
monitorPort 6043 // taosKeeper port
monitorMaxLogs 100
```
[AGPL-3.0 License](../../LICENSE)