Merge branch '3.0' of github.com:taosdata/TDengine into szhou/tms-wc/save-row-simplebuf

This commit is contained in:
slzhou 2024-02-22 10:03:47 +08:00
commit a769bc7217
15 changed files with 606 additions and 563 deletions

View File

@ -20,13 +20,13 @@ import CDemo from "./_sub_c.mdx";
# 介绍
## 主题
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../12-taos-sql/13-tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../taos-sql/tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。
如下图,每个 topic 涉及到的数据表可能分布在多个 vnode相当于 kafka 里的 partition 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。
![img_5.png](img_5.png)
TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 [CREATE DATABASE](../../12-taos-sql/02-database) 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。
TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 [CREATE DATABASE](../../taos-sql/database) 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。
对于 `SELECT` 语句形式的 topic在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
@ -66,7 +66,7 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
# 语法说明
具体的语法参见 [数据订阅](../../12-taos-sql/13-tmq)
具体的语法参见 [数据订阅](../../taos-sql/tmq)
# 消费参数
@ -91,7 +91,6 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
不同语言下, TMQ 订阅相关的 API 及数据结构如下详细的接口说明可以参考连接器章节注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
@ -147,6 +146,9 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
DLL_EXPORT const char *tmq_err2str(int32_t code);
```
下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
</TabItem>
<TabItem value="java" label="Java">
@ -304,6 +306,7 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
void Close()
```
</TabItem>
</Tabs>
@ -334,8 +337,8 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
对于不同编程语言,其设置方式如下:
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
@ -352,8 +355,8 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
```
</TabItem>
</TabItem>
<TabItem value="java" label="Java">
对于 Java 程序,还可以使用如下配置项:
@ -387,6 +390,7 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
```
</TabItem>
<TabItem label="Go" value="Go">
@ -500,7 +504,6 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
一个 consumer 支持同时订阅多个 topic。
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
@ -580,7 +583,6 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
以下代码展示了不同语言下如何对 TMQ 消息进行消费。
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
@ -717,7 +719,6 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
消费结束后,应当取消订阅。
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c

View File

@ -26,6 +26,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
bool isLeader, bool restored);
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);

View File

@ -88,6 +88,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -837,6 +837,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -29,7 +29,7 @@
#define MND_STREAM_MAX_NUM 60
typedef struct SMStreamNodeCheckMsg {
typedef struct {
int8_t placeHolder; // // to fix windows compile error, define place holder
} SMStreamNodeCheckMsg;
@ -1545,6 +1545,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
}
}
mInfo("stream:%s,%"PRId64 " start to pause stream", pauseReq.name, pStream->uid);
if (pStream->status == STREAM_STATUS__PAUSE) {
sdbRelease(pMnode->pSdb, pStream);
return 0;

View File

@ -16,6 +16,10 @@
#include "mndStream.h"
#include "mndTrans.h"
typedef struct {
SMsgHead head;
} SMStreamHbRspMsg;
typedef struct SFailedCheckpointInfo {
int64_t streamUid;
int64_t checkpointId;
@ -211,7 +215,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
mInfo("receive pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name);
mInfo("receive pause stream:%s, %s, %"PRId64 ", because grant expired", pStream->name, reqPause.name, pStream->uid);
}
sdbRelease(pSdb, pStream);
@ -222,8 +226,8 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
SArray *pFailedTasks = NULL;
SArray *pOrphanTasks = NULL;
if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) {
if (suspendAllStreams(pMnode, &pReq->info) < 0) {
@ -244,6 +248,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
taosThreadMutexLock(&execInfo.lock);
// extract stream task list
@ -349,5 +356,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosArrayDestroy(pFailedTasks);
taosArrayDestroy(pOrphanTasks);
{
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead* pHead = rsp.pCont;
pHead->vgId = htonl(req.vgId);
tmsgSendRsp(&rsp);
pReq->info.handle = NULL; // disable auto rsp
}
return TSDB_CODE_SUCCESS;
}

View File

@ -178,6 +178,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return tqStreamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true);
case TDMT_STREAM_TASK_CHECKPOINT_READY:
return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg);
case TDMT_MND_STREAM_HEARTBEAT_RSP:
return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg);
default:
sndError("invalid snode msg:%d", pMsg->msgType);
ASSERT(0);

View File

@ -242,6 +242,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);

View File

@ -1220,3 +1220,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
}
// this function is needed, do not try to remove it.
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg);
}

View File

@ -612,9 +612,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
streamMetaReleaseTask(pMeta, pTask);
}
streamMetaWLock(pMeta);
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, false);
streamMetaWUnLock(pMeta);
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, true);
// drop the stream task now
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
@ -939,3 +937,10 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) {
return taosArrayGetSize(pMeta->pTaskList);
}
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return TSDB_CODE_SUCCESS;
}

View File

@ -796,6 +796,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY:
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_HEARTBEAT_RSP:
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;

View File

@ -164,7 +164,7 @@ void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, ESt
} else {
pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey);
}
SSessionKey tmpKey = {0};
SSessionKey tmpKey = {.groupId = pKey->groupId, .win.ekey = INT64_MIN, .win.skey = INT64_MIN};
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
pAggSup->stateStore.streamStateFreeCur(pCur);

View File

@ -1160,7 +1160,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
}
tEncoderClear(&encoder);
SRpcMsg msg = {.info.noResp = 1};
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
pMeta->pHbInfo->hbCount += 1;

View File

@ -756,14 +756,18 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
}
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool metaLock) {
if (pTask == NULL) {
return TSDB_CODE_SUCCESS;
}
SStreamMeta* pMeta = pTask->pMeta;
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
if (pTask->info.fillHistory == 0) {
return 0;
return TSDB_CODE_SUCCESS;
}
if (metaLock) {
streamMetaWLock(pTask->pMeta);
streamMetaWLock(pMeta);
}
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
@ -784,7 +788,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool
}
if (metaLock) {
streamMetaWUnLock(pTask->pMeta);
streamMetaWUnLock(pMeta);
}
return TSDB_CODE_SUCCESS;

View File

@ -961,7 +961,7 @@ static void cliSendCb(uv_write_t* req, int status) {
tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
}
}
if (pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) {
if (pMsg != NULL && pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) {
rpcFreeCont(pMsg->msg.pCont);
pMsg->msg.pCont = 0;
}