Merge branch 'refact/tsdb_last' of https://github.com/taosdata/TDengine into refact/tsdb_last
This commit is contained in:
commit
b2a87c87b6
|
@ -7,12 +7,13 @@ import Tabs from "@theme/Tabs";
|
||||||
import TabItem from "@theme/TabItem";
|
import TabItem from "@theme/TabItem";
|
||||||
import PkgListV3 from "/components/PkgListV3";
|
import PkgListV3 from "/components/PkgListV3";
|
||||||
|
|
||||||
|
您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
|
||||||
|
|
||||||
TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../reference/rest-api/)。
|
TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../reference/rest-api/)。
|
||||||
|
|
||||||
为方便使用,标准的服务端安装包包含了 taos、taosd、taosAdapter、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。
|
为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。
|
||||||
|
|
||||||
在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用 Docker 立即体验](../../get-started/docker/)。需要注意的是,rpm 和 deb 包不含 taosdump 和 TDinsight 安装脚本,这些工具需要通过安装 taosTool 包获得。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
|
|
||||||
|
|
||||||
|
在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。需要注意的是,rpm 和 deb 包不含 taosdump 和 TDinsight 安装脚本,这些工具需要通过安装 taosTool 包获得。TDengine 也提供 Windows x64 平台的安装包。
|
||||||
|
|
||||||
## 安装
|
## 安装
|
||||||
|
|
||||||
|
|
|
@ -2555,10 +2555,14 @@ typedef struct {
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
int64_t ntbUid;
|
int64_t ntbUid;
|
||||||
SArray* colIdList; // SArray<int16_t>
|
SArray* colIdList; // SArray<int16_t>
|
||||||
} SCheckAlterInfo;
|
} STqCheckInfo;
|
||||||
|
|
||||||
int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo);
|
int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo);
|
||||||
int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo);
|
int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
} STqDelCheckInfoReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
|
|
@ -188,7 +188,8 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ADD_CHECK_INFO, "vnode-add-check-info", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_VND_DELETE_CHECK_INFO, "vnode-delete-check-info", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
|
||||||
|
|
|
@ -515,7 +515,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
void streamMetaClose(SStreamMeta* streamMeta);
|
void streamMetaClose(SStreamMeta* streamMeta);
|
||||||
|
|
||||||
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||||
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
|
||||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
|
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
|
|
||||||
|
|
|
@ -4262,7 +4262,6 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) {
|
int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) {
|
||||||
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
|
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
|
||||||
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
|
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
|
||||||
|
@ -4300,7 +4299,6 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pRe
|
||||||
tEndDecode(decoder);
|
tEndDecode(decoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
|
int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
@ -5590,7 +5588,6 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 1
|
|
||||||
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
||||||
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
|
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
|
||||||
snprintf(buf, maxLen, "offset(reset to none)");
|
snprintf(buf, maxLen, "offset(reset to none)");
|
||||||
|
@ -5609,7 +5606,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
||||||
if (pLeft->type == pRight->type) {
|
if (pLeft->type == pRight->type) {
|
||||||
|
@ -5643,7 +5639,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) {
|
int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) {
|
||||||
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;
|
||||||
int32_t sz = taosArrayGetSize(pInfo->colIdList);
|
int32_t sz = taosArrayGetSize(pInfo->colIdList);
|
||||||
|
@ -5655,7 +5651,7 @@ int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo)
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) {
|
int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
|
||||||
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
|
||||||
int32_t sz;
|
int32_t sz;
|
||||||
|
|
|
@ -225,7 +225,8 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -361,7 +361,8 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -15,10 +15,10 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndOffset.h"
|
#include "mndOffset.h"
|
||||||
#include "mndPrivilege.h"
|
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
#include "mndTopic.h"
|
#include "mndTopic.h"
|
||||||
|
@ -305,7 +305,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
sdbRelease(pSdb, pOffset);
|
sdbRelease(pSdb, pOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
|
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
|
||||||
|
|
|
@ -57,7 +57,8 @@ int32_t mndInitTopic(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_ADD_CHECK_INFO_RSP, mndTransProcessRsp);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_VND_DELETE_CHECK_INFO_RSP, mndTransProcessRsp);
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
|
||||||
|
@ -450,7 +451,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
if (topicObj.ntbUid != 0) {
|
if (topicObj.ntbUid != 0) {
|
||||||
SCheckAlterInfo info;
|
STqCheckInfo info;
|
||||||
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
|
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
|
||||||
info.ntbUid = topicObj.ntbUid;
|
info.ntbUid = topicObj.ntbUid;
|
||||||
info.colIdList = topicObj.ntbColIds;
|
info.colIdList = topicObj.ntbColIds;
|
||||||
|
@ -470,7 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
// encoder check alter info
|
// encoder check alter info
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code);
|
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
@ -481,7 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) {
|
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -493,7 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
action.pCont = buf;
|
action.pCont = buf;
|
||||||
action.contLen = sizeof(SMsgHead) + len;
|
action.contLen = sizeof(SMsgHead) + len;
|
||||||
action.msgType = TDMT_VND_CHECK_ALTER_INFO;
|
action.msgType = TDMT_VND_ADD_CHECK_INFO;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
@ -659,12 +660,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
// TODO check if rebalancing
|
// TODO check if rebalancing
|
||||||
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||||
|
@ -675,6 +678,37 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTopic->ntbUid != 0) {
|
||||||
|
// broadcast to all vnode
|
||||||
|
void *pIter = NULL;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
|
||||||
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
|
||||||
|
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
action.pCont = buf;
|
||||||
|
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
|
||||||
|
action.msgType = TDMT_VND_DELETE_CHECK_INFO;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
|
||||||
|
|
|
@ -117,16 +117,15 @@ typedef struct {
|
||||||
struct STQ {
|
struct STQ {
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
char* path;
|
char* path;
|
||||||
SHashObj* pushMgr; // consumerId -> STqHandle*
|
SHashObj* pPushMgr; // consumerId -> STqHandle*
|
||||||
SHashObj* handles; // subKey -> STqHandle
|
SHashObj* pHandle; // subKey -> STqHandle
|
||||||
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
|
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
|
||||||
|
|
||||||
STqOffsetStore* pOffsetStore;
|
STqOffsetStore* pOffsetStore;
|
||||||
|
|
||||||
TDB* pMetaStore;
|
TDB* pMetaDB;
|
||||||
TTB* pExecStore;
|
TTB* pExecStore;
|
||||||
|
TTB* pCheckStore;
|
||||||
TTB* pAlterInfoStore;
|
|
||||||
|
|
||||||
SStreamMeta* pStreamMeta;
|
SStreamMeta* pStreamMeta;
|
||||||
};
|
};
|
||||||
|
@ -155,6 +154,9 @@ int32_t tqMetaClose(STQ* pTq);
|
||||||
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
|
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
|
||||||
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
|
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
|
||||||
int32_t tqMetaRestoreHandle(STQ* pTq);
|
int32_t tqMetaRestoreHandle(STQ* pTq);
|
||||||
|
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
|
||||||
|
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
|
||||||
|
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t size;
|
int32_t size;
|
||||||
|
|
|
@ -163,13 +163,16 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
||||||
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen);
|
// tq-mq
|
||||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver);
|
int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
|
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
|
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
|
// tq-stream
|
||||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
|
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
|
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
||||||
|
|
|
@ -60,11 +60,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
pTq->path = strdup(path);
|
pTq->path = strdup(path);
|
||||||
pTq->pVnode = pVnode;
|
pTq->pVnode = pVnode;
|
||||||
|
|
||||||
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
if (tqMetaOpen(pTq) < 0) {
|
if (tqMetaOpen(pTq) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -85,9 +85,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
void tqClose(STQ* pTq) {
|
void tqClose(STQ* pTq) {
|
||||||
if (pTq) {
|
if (pTq) {
|
||||||
tqOffsetClose(pTq->pOffsetStore);
|
tqOffsetClose(pTq->pOffsetStore);
|
||||||
taosHashCleanup(pTq->handles);
|
taosHashCleanup(pTq->pHandle);
|
||||||
taosHashCleanup(pTq->pushMgr);
|
taosHashCleanup(pTq->pPushMgr);
|
||||||
taosHashCleanup(pTq->pAlterInfo);
|
taosHashCleanup(pTq->pCheckInfo);
|
||||||
taosMemoryFree(pTq->path);
|
taosMemoryFree(pTq->path);
|
||||||
tqMetaClose(pTq);
|
tqMetaClose(pTq);
|
||||||
streamMetaClose(pTq->pStreamMeta);
|
streamMetaClose(pTq->pStreamMeta);
|
||||||
|
@ -183,7 +183,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver) {
|
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
||||||
|
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
||||||
|
pLeft->val.version <= pRight->val.version;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
STqOffset offset = {0};
|
STqOffset offset = {0};
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, msg, msgLen);
|
tDecoderInit(&decoder, msg, msgLen);
|
||||||
|
@ -199,19 +204,24 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
|
||||||
} else if (offset.val.type == TMQ_OFFSET__LOG) {
|
} else if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||||
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
|
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
|
||||||
TD_VID(pTq->pVnode), offset.val.version);
|
TD_VID(pTq->pVnode), offset.val.version);
|
||||||
|
if (offset.val.version + 1 == version) {
|
||||||
|
offset.val.version += 1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
/*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/
|
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
|
||||||
/*if (pOffset != NULL) {*/
|
if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
|
||||||
/*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (offset.val.type == TMQ_OFFSET__LOG) {
|
if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
|
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -220,6 +230,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rsp
|
||||||
|
|
||||||
/*}*/
|
/*}*/
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
||||||
|
@ -229,15 +241,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
|
||||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pAlterInfo, pIter);
|
pIter = taosHashIterate(pTq->pCheckInfo, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter;
|
STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
|
||||||
if (pCheck->ntbUid == tbUid) {
|
if (pCheck->ntbUid == tbUid) {
|
||||||
int32_t sz = taosArrayGetSize(pCheck->colIdList);
|
int32_t sz = taosArrayGetSize(pCheck->colIdList);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
|
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
|
||||||
if (forbidColId == colId) {
|
if (forbidColId == colId) {
|
||||||
taosHashCancelIterate(pTq->pAlterInfo, pIter);
|
taosHashCancelIterate(pTq->pCheckInfo, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -289,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SWalCkHead* pCkHead = NULL;
|
SWalCkHead* pCkHead = NULL;
|
||||||
|
|
||||||
// 1.find handle
|
// 1.find handle
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
/*ASSERT(pHandle);*/
|
/*ASSERT(pHandle);*/
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
|
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
|
||||||
|
@ -478,10 +490,10 @@ OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||||
|
|
||||||
int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
|
|
||||||
tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
|
tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
|
||||||
|
@ -492,27 +504,43 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
SCheckAlterInfo info = {0};
|
STqCheckInfo info = {0};
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, msg, msgLen);
|
tDecoderInit(&decoder, msg, msgLen);
|
||||||
if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) {
|
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) {
|
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
|
if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
SMqRebVgReq req = {0};
|
SMqRebVgReq req = {0};
|
||||||
tDecodeSMqRebVgReq(msg, &req);
|
tDecodeSMqRebVgReq(msg, &req);
|
||||||
// todo lock
|
// todo lock
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
if (req.oldConsumerId != -1) {
|
if (req.oldConsumerId != -1) {
|
||||||
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
|
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
|
||||||
|
@ -579,7 +607,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
|
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
}
|
}
|
||||||
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||||
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
|
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
// TODO
|
// TODO
|
||||||
|
@ -668,34 +696,9 @@ FAIL:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
//
|
//
|
||||||
return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen);
|
return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
|
||||||
#if 0
|
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
|
||||||
if (pTask == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
SDecoder decoder;
|
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
|
||||||
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
|
|
||||||
if (tqExpandTask(pTq, pTask) < 0) {
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
FAIL:
|
|
||||||
if (pTask) taosMemoryFree(pTask);
|
|
||||||
return -1;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
||||||
|
@ -817,7 +820,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
|
|
||||||
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
|
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
|
||||||
|
|
|
@ -43,6 +43,185 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaOpen(STQ* pTq) {
|
||||||
|
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tqMetaRestoreHandle(pTq) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tqMetaRestoreCheckInfo(pTq) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaClose(STQ* pTq) {
|
||||||
|
if (pTq->pExecStore) {
|
||||||
|
tdbTbClose(pTq->pExecStore);
|
||||||
|
}
|
||||||
|
if (pTq->pCheckStore) {
|
||||||
|
tdbTbClose(pTq->pCheckStore);
|
||||||
|
}
|
||||||
|
tdbClose(pTq->pMetaDB);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) {
|
||||||
|
TXN txn;
|
||||||
|
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
|
||||||
|
TXN txn;
|
||||||
|
|
||||||
|
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
|
||||||
|
TBC* pCur = NULL;
|
||||||
|
if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pKey = NULL;
|
||||||
|
int kLen = 0;
|
||||||
|
void* pVal = NULL;
|
||||||
|
int vLen = 0;
|
||||||
|
SDecoder decoder;
|
||||||
|
|
||||||
|
tdbTbcMoveToFirst(pCur);
|
||||||
|
|
||||||
|
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||||
|
STqCheckInfo info;
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
|
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tdbTbcClose(pCur);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
||||||
|
int32_t code;
|
||||||
|
int32_t vlen;
|
||||||
|
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
|
||||||
|
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId,
|
||||||
|
TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
|
void* buf = taosMemoryCalloc(1, vlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, buf, vlen);
|
||||||
|
|
||||||
|
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
TXN txn;
|
||||||
|
|
||||||
|
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
|
||||||
|
TXN txn;
|
||||||
|
|
||||||
|
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
TBC* pCur = NULL;
|
TBC* pCur = NULL;
|
||||||
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
||||||
|
@ -93,101 +272,10 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
}
|
}
|
||||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
|
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
|
||||||
taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqMetaOpen(STQ* pTq) {
|
|
||||||
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tqMetaRestoreHandle(pTq) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqMetaClose(STQ* pTq) {
|
|
||||||
if (pTq->pExecStore) {
|
|
||||||
tdbTbClose(pTq->pExecStore);
|
|
||||||
}
|
|
||||||
tdbClose(pTq->pMetaStore);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
|
||||||
int32_t code;
|
|
||||||
int32_t vlen;
|
|
||||||
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
|
|
||||||
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId,
|
|
||||||
TD_VID(pTq->pVnode));
|
|
||||||
|
|
||||||
void* buf = taosMemoryCalloc(1, vlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
SEncoder encoder;
|
|
||||||
tEncoderInit(&encoder, buf, vlen);
|
|
||||||
|
|
||||||
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
TXN txn;
|
|
||||||
|
|
||||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
taosMemoryFree(buf);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
|
|
||||||
TXN txn;
|
|
||||||
|
|
||||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) {
|
|
||||||
/*ASSERT(0);*/
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
|
@ -394,7 +394,7 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->handles, pIter);
|
pIter = taosHashIterate(pTq->pHandle, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
STqHandle* pExec = (STqHandle*)pIter;
|
STqHandle* pExec = (STqHandle*)pIter;
|
||||||
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
|
|
@ -165,9 +165,9 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
||||||
STQ* pTq = pWriter->pTq;
|
STQ* pTq = pWriter->pTq;
|
||||||
|
|
||||||
if (rollback) {
|
if (rollback) {
|
||||||
ASSERT(0);
|
tdbAbort(pWriter->pTq->pMetaDB, &pWriter->txn);
|
||||||
} else {
|
} else {
|
||||||
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
|
code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -196,36 +196,42 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
break;
|
break;
|
||||||
/* TQ */
|
/* TQ */
|
||||||
case TDMT_VND_MQ_VG_CHANGE:
|
case TDMT_VND_MQ_VG_CHANGE:
|
||||||
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessVgChangeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_MQ_VG_DELETE:
|
case TDMT_VND_MQ_VG_DELETE:
|
||||||
if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
|
if (tqProcessVgDeleteReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_MQ_COMMIT_OFFSET:
|
case TDMT_VND_MQ_COMMIT_OFFSET:
|
||||||
if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
pMsg->contLen - sizeof(SMsgHead), version) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_CHECK_ALTER_INFO:
|
case TDMT_VND_ADD_CHECK_INFO:
|
||||||
if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case TDMT_VND_DELETE_CHECK_INFO:
|
||||||
|
if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_STREAM_TASK_DEPLOY: {
|
case TDMT_STREAM_TASK_DEPLOY: {
|
||||||
if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessTaskDeployReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_TASK_DROP: {
|
case TDMT_STREAM_TASK_DROP: {
|
||||||
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
|
if (tqProcessTaskDropReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
|
|
|
@ -192,6 +192,24 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t countTrailingSpaces(const SValueNode* pVal, bool isLtrim) {
|
||||||
|
int32_t numOfSpaces = 0;
|
||||||
|
int32_t len = varDataLen(pVal->datum.p);
|
||||||
|
char* str = varDataVal(pVal->datum.p);
|
||||||
|
|
||||||
|
int32_t startPos = isLtrim ? 0 : len - 1;
|
||||||
|
int32_t step = isLtrim ? 1 : -1;
|
||||||
|
for (int32_t i = startPos; i < len || i >= 0; i += step) {
|
||||||
|
if (!isspace(str[i])) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
numOfSpaces++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return numOfSpaces;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
void static addTimezoneParam(SNodeList* pList) {
|
void static addTimezoneParam(SNodeList* pList) {
|
||||||
char buf[6] = {0};
|
char buf[6] = {0};
|
||||||
time_t t = taosTime(NULL);
|
time_t t = taosTime(NULL);
|
||||||
|
@ -293,6 +311,40 @@ static int32_t translateInOutStr(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateTrimStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isLtrim) {
|
||||||
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
if (!IS_VAR_DATA_TYPE(pPara1->resType.type)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfSpaces = 0;
|
||||||
|
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
// for select trim functions with constant value from table,
|
||||||
|
// need to set the proper result result schema bytes to avoid
|
||||||
|
// trailing garbage characters
|
||||||
|
if (nodeType(pParamNode1) == QUERY_NODE_VALUE) {
|
||||||
|
SValueNode* pValue = (SValueNode*)pParamNode1;
|
||||||
|
numOfSpaces = countTrailingSpaces(pValue, isLtrim);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t resBytes = pPara1->resType.bytes - numOfSpaces;
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = resBytes, .type = pPara1->resType.type};
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateLtrim(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
return translateTrimStr(pFunc, pErrBuf, len, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateRtrim(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
return translateTrimStr(pFunc, pErrBuf, len, false);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateLogarithm(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateLogarithm(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||||
if (1 != numOfParams && 2 != numOfParams) {
|
if (1 != numOfParams && 2 != numOfParams) {
|
||||||
|
@ -2827,7 +2879,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.name = "ltrim",
|
.name = "ltrim",
|
||||||
.type = FUNCTION_TYPE_LTRIM,
|
.type = FUNCTION_TYPE_LTRIM,
|
||||||
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
||||||
.translateFunc = translateInOutStr,
|
.translateFunc = translateLtrim,
|
||||||
.getEnvFunc = NULL,
|
.getEnvFunc = NULL,
|
||||||
.initFunc = NULL,
|
.initFunc = NULL,
|
||||||
.sprocessFunc = ltrimFunction,
|
.sprocessFunc = ltrimFunction,
|
||||||
|
@ -2837,7 +2889,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.name = "rtrim",
|
.name = "rtrim",
|
||||||
.type = FUNCTION_TYPE_RTRIM,
|
.type = FUNCTION_TYPE_RTRIM,
|
||||||
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
|
||||||
.translateFunc = translateInOutStr,
|
.translateFunc = translateRtrim,
|
||||||
.getEnvFunc = NULL,
|
.getEnvFunc = NULL,
|
||||||
.initFunc = NULL,
|
.initFunc = NULL,
|
||||||
.sprocessFunc = rtrimFunction,
|
.sprocessFunc = rtrimFunction,
|
||||||
|
|
|
@ -758,7 +758,9 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
|
||||||
res->datum.p = taosMemoryCalloc(len, 1);
|
res->datum.p = taosMemoryCalloc(len, 1);
|
||||||
memcpy(res->datum.p, output.columnData->pData, len);
|
memcpy(res->datum.p, output.columnData->pData, len);
|
||||||
} else if (IS_VAR_DATA_TYPE(type)) {
|
} else if (IS_VAR_DATA_TYPE(type)) {
|
||||||
res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
|
//res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
|
||||||
|
res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData), 1);
|
||||||
|
res->node.resType.bytes = varDataTLen(output.columnData->pData);
|
||||||
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
|
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
|
||||||
} else {
|
} else {
|
||||||
nodesSetValueNodeValue(res, output.columnData->pData);
|
nodesSetValueNodeValue(res, output.columnData->pData);
|
||||||
|
|
|
@ -81,7 +81,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
||||||
taosMemoryFree(pMeta);
|
taosMemoryFree(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen) {
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -205,28 +205,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
||||||
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
|
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
|
||||||
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
||||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
|
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
|
||||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
|
||||||
do { \
|
|
||||||
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
|
|
||||||
uint64_t ahandle = head->ahandle; \
|
|
||||||
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
|
|
||||||
transClearBuffer(&conn->readBuf); \
|
|
||||||
transFreeMsg(transContFromHead((char*)head)); \
|
|
||||||
if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \
|
|
||||||
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \
|
|
||||||
if (cliMsg->type == Release) return; \
|
|
||||||
} \
|
|
||||||
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); \
|
|
||||||
if (T_REF_VAL_GET(conn) > 1) { \
|
|
||||||
transUnrefCliHandle(conn); \
|
|
||||||
} \
|
|
||||||
destroyCmsg(pMsg); \
|
|
||||||
cliReleaseUnfinishedMsg(conn); \
|
|
||||||
transQueueClear(&conn->cliMsgs); \
|
|
||||||
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \
|
|
||||||
return; \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
|
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -358,7 +336,6 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
if (cliRecvReleaseReq(conn, pHead)) {
|
if (cliRecvReleaseReq(conn, pHead)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
CONN_SHOULD_RELEASE(conn, pHead);
|
|
||||||
|
|
||||||
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
||||||
pMsg = transQueuePop(&conn->cliMsgs);
|
pMsg = transQueuePop(&conn->cliMsgs);
|
||||||
|
@ -1418,7 +1395,7 @@ int transReleaseCliHandle(void* handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STransMsg tmsg = {.info.handle = handle};
|
STransMsg tmsg = {.info.handle = handle};
|
||||||
// TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
|
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
|
||||||
|
|
||||||
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||||
cmsg->msg = tmsg;
|
cmsg->msg = tmsg;
|
||||||
|
|
Loading…
Reference in New Issue