diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 9cd2446ba9..c1a67f0182 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -7,12 +7,13 @@ import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; import PkgListV3 from "/components/PkgListV3"; +您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. + TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../reference/rest-api/)。 -为方便使用,标准的服务端安装包包含了 taos、taosd、taosAdapter、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。 - -在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用 Docker 立即体验](../../get-started/docker/)。需要注意的是,rpm 和 deb 包不含 taosdump 和 TDinsight 安装脚本,这些工具需要通过安装 taosTool 包获得。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. +为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。 +在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。需要注意的是,rpm 和 deb 包不含 taosdump 和 TDinsight 安装脚本,这些工具需要通过安装 taosTool 包获得。TDengine 也提供 Windows x64 平台的安装包。 ## 安装 diff --git a/include/common/tmsg.h b/include/common/tmsg.h index cc15d4ed6b..fa6f4b6c79 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2555,10 +2555,14 @@ typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; int64_t ntbUid; SArray* colIdList; // SArray -} SCheckAlterInfo; +} STqCheckInfo; -int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo); -int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo); +int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo); +int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo); + +typedef struct { + char topic[TSDB_TOPIC_FNAME_LEN]; +} STqDelCheckInfoReq; typedef struct { int32_t vgId; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6462c7afbf..b16df0e885 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -188,7 +188,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset) - TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_ADD_CHECK_INFO, "vnode-add-check-info", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DELETE_CHECK_INFO, "vnode-delete-check-info", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index eac92d76ba..e6fcb021d5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -515,7 +515,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen); +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7dd3ce34c3..533d924546 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4262,7 +4262,6 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp tDecoderClear(&decoder); return 0; } - int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) { if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1; if (tEncodeI64(encoder, pOffset->offset) < 0) return -1; @@ -4300,7 +4299,6 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pRe tEndDecode(decoder); return 0; } - int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -5590,7 +5588,6 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { return 0; } -#if 1 int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { if (pVal->type == TMQ_OFFSET__RESET_NONE) { snprintf(buf, maxLen, "offset(reset to none)"); @@ -5609,7 +5606,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { } return 0; } -#endif bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { if (pLeft->type == pRight->type) { @@ -5643,7 +5639,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { return 0; } -int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) { +int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) { if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1; if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1; int32_t sz = taosArrayGetSize(pInfo->colIdList); @@ -5655,7 +5651,7 @@ int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) return pEncoder->pos; } -int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) { +int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) { if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1; if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1; int32_t sz; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 647af20fcf..ec761e6441 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -225,7 +225,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7c6807ab87..8eb3ed3901 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -361,7 +361,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index 9f6108004d..037a46345f 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -15,10 +15,10 @@ #define _DEFAULT_SOURCE #include "mndOffset.h" -#include "mndPrivilege.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" +#include "mndPrivilege.h" #include "mndShow.h" #include "mndStb.h" #include "mndTopic.h" @@ -305,7 +305,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { sdbRelease(pSdb, pOffset); } - return code; + return code; } int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 820bb4b636..ff208eae60 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -57,7 +57,8 @@ int32_t mndInitTopic(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_ADD_CHECK_INFO_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DELETE_CHECK_INFO_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); @@ -450,7 +451,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (topicObj.ntbUid != 0) { - SCheckAlterInfo info; + STqCheckInfo info; memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN); info.ntbUid = topicObj.ntbUid; info.colIdList = topicObj.ntbColIds; @@ -470,7 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * // encoder check alter info int32_t len; int32_t code; - tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code); + tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); if (code < 0) { sdbRelease(pSdb, pVgroup); mndTransDrop(pTrans); @@ -481,7 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, len); - if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) { + if (tEncodeSTqCheckInfo(&encoder, &info) < 0) { sdbRelease(pSdb, pVgroup); mndTransDrop(pTrans); return -1; @@ -493,7 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = buf; action.contLen = sizeof(SMsgHead) + len; - action.msgType = TDMT_VND_CHECK_ALTER_INFO; + action.msgType = TDMT_VND_ADD_CHECK_INFO; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); @@ -659,12 +660,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); +#if 0 if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { ASSERT(0); mndTransDrop(pTrans); mndReleaseTopic(pMnode, pTopic); return -1; } +#endif // TODO check if rebalancing if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { @@ -675,6 +678,37 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { return -1; } + if (pTopic->ntbUid != 0) { + // broadcast to all vnode + void *pIter = NULL; + SVgObj *pVgroup = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) { + sdbRelease(pSdb, pVgroup); + continue; + } + + void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); + memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN); + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = buf; + action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; + action.msgType = TDMT_VND_DELETE_CHECK_INFO; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(buf); + sdbRelease(pSdb, pVgroup); + mndTransDrop(pTrans); + return -1; + } + } + } + int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic); mndReleaseTopic(pMnode, pTopic); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index a1dba41c94..cb5ec7aabe 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -117,16 +117,15 @@ typedef struct { struct STQ { SVnode* pVnode; char* path; - SHashObj* pushMgr; // consumerId -> STqHandle* - SHashObj* handles; // subKey -> STqHandle - SHashObj* pAlterInfo; // topic -> SAlterCheckInfo + SHashObj* pPushMgr; // consumerId -> STqHandle* + SHashObj* pHandle; // subKey -> STqHandle + SHashObj* pCheckInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; - TDB* pMetaStore; + TDB* pMetaDB; TTB* pExecStore; - - TTB* pAlterInfoStore; + TTB* pCheckStore; SStreamMeta* pStreamMeta; }; @@ -155,6 +154,9 @@ int32_t tqMetaClose(STQ* pTq); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); int32_t tqMetaRestoreHandle(STQ* pTq); +int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen); +int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); +int32_t tqMetaRestoreCheckInfo(STQ* pTq); typedef struct { int32_t size; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 47e18732e0..0c7a08a2b5 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -163,13 +163,16 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); -int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver); +// tq-mq +int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); +// tq-stream +int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 112543e340..a98fea1988 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -60,11 +60,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->path = strdup(path); pTq->pVnode = pVnode; - pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); - pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); if (tqMetaOpen(pTq) < 0) { ASSERT(0); @@ -85,9 +85,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { void tqClose(STQ* pTq) { if (pTq) { tqOffsetClose(pTq->pOffsetStore); - taosHashCleanup(pTq->handles); - taosHashCleanup(pTq->pushMgr); - taosHashCleanup(pTq->pAlterInfo); + taosHashCleanup(pTq->pHandle); + taosHashCleanup(pTq->pPushMgr); + taosHashCleanup(pTq->pCheckInfo); taosMemoryFree(pTq->path); tqMetaClose(pTq); streamMetaClose(pTq->pStreamMeta); @@ -183,7 +183,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con return 0; } -int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver) { +static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { + return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && + pLeft->val.version <= pRight->val.version; +} + +int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { STqOffset offset = {0}; SDecoder decoder; tDecoderInit(&decoder, msg, msgLen); @@ -199,19 +204,24 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve } else if (offset.val.type == TMQ_OFFSET__LOG) { tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, TD_VID(pTq->pVnode), offset.val.version); + if (offset.val.version + 1 == version) { + offset.val.version += 1; + } } else { ASSERT(0); } - /*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/ - /*if (pOffset != NULL) {*/ - /*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/ + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); + if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) { + return 0; + } + if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { ASSERT(0); return -1; } if (offset.val.type == TMQ_OFFSET__LOG) { - STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey)); + STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); if (pHandle) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) { ASSERT(0); @@ -220,6 +230,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve } } + // rsp + /*}*/ /*}*/ @@ -229,15 +241,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { void* pIter = NULL; while (1) { - pIter = taosHashIterate(pTq->pAlterInfo, pIter); + pIter = taosHashIterate(pTq->pCheckInfo, pIter); if (pIter == NULL) break; - SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter; + STqCheckInfo* pCheck = (STqCheckInfo*)pIter; if (pCheck->ntbUid == tbUid) { int32_t sz = taosArrayGetSize(pCheck->colIdList); for (int32_t i = 0; i < sz; i++) { int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i); if (forbidColId == colId) { - taosHashCancelIterate(pTq->pAlterInfo, pIter); + taosHashCancelIterate(pTq->pCheckInfo, pIter); return -1; } } @@ -289,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SWalCkHead* pCkHead = NULL; // 1.find handle - STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); + STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); /*ASSERT(pHandle);*/ if (pHandle == NULL) { tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId, @@ -478,10 +490,10 @@ OVER: return code; } -int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; - int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey)); + int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); ASSERT(code == 0); tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); @@ -492,27 +504,43 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { return 0; } -int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) { - SCheckAlterInfo info = {0}; - SDecoder decoder; +int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { + STqCheckInfo info = {0}; + SDecoder decoder; tDecoderInit(&decoder, msg, msgLen); - if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) { + if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } tDecoderClear(&decoder); - if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) { + if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } return 0; } -int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { + if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (tqMetaDeleteCheckInfo(pTq, msg) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; +} + +int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); // todo lock - STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey)); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey, @@ -579,7 +607,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); taosArrayDestroy(tbUidList); } - taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); + taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { // TODO @@ -668,34 +696,9 @@ FAIL: return code; } -int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { // - return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen); -#if 0 - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - if (tDecodeSStreamTask(&decoder, pTask) < 0) { - ASSERT(0); - goto FAIL; - } - tDecoderClear(&decoder); - - if (tqExpandTask(pTq, pTask) < 0) { - goto FAIL; - } - - taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); - - return 0; - -FAIL: - if (pTask) taosMemoryFree(pTask); - return -1; -#endif + return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen); } int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { @@ -817,7 +820,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { } } -int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 5709ad7c85..405bc669bd 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -43,6 +43,185 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { return 0; } +int32_t tqMetaOpen(STQ* pTq) { + if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) { + ASSERT(0); + return -1; + } + + if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) { + ASSERT(0); + return -1; + } + + if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) { + ASSERT(0); + return -1; + } + + if (tqMetaRestoreHandle(pTq) < 0) { + return -1; + } + + if (tqMetaRestoreCheckInfo(pTq) < 0) { + return -1; + } + + return 0; +} + +int32_t tqMetaClose(STQ* pTq) { + if (pTq->pExecStore) { + tdbTbClose(pTq->pExecStore); + } + if (pTq->pCheckStore) { + tdbTbClose(pTq->pCheckStore); + } + tdbClose(pTq->pMetaDB); + return 0; +} + +int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) { + TXN txn; + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + return -1; + } + + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { + return -1; + } + + if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) { + return -1; + } + + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { + return -1; + } + + return 0; +} + +int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) { + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) { + /*ASSERT(0);*/ + } + + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + return 0; +} + +int32_t tqMetaRestoreCheckInfo(STQ* pTq) { + TBC* pCur = NULL; + if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) { + ASSERT(0); + return -1; + } + + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; + SDecoder decoder; + + tdbTbcMoveToFirst(pCur); + + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + STqCheckInfo info; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + tDecoderClear(&decoder); + if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + tdbTbcClose(pCur); + return 0; +} + +int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { + int32_t code; + int32_t vlen; + tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); + ASSERT(code == 0); + + tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId, + TD_VID(pTq->pVnode)); + + void* buf = taosMemoryCalloc(1, vlen); + if (buf == NULL) { + ASSERT(0); + } + + SEncoder encoder; + tEncoderInit(&encoder, buf, vlen); + + if (tEncodeSTqHandle(&encoder, pHandle) < 0) { + ASSERT(0); + } + + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) { + ASSERT(0); + } + + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + tEncoderClear(&encoder); + taosMemoryFree(buf); + return 0; +} + +int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) { + /*ASSERT(0);*/ + } + + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + return 0; +} + int32_t tqMetaRestoreHandle(STQ* pTq) { TBC* pCur = NULL; if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) { @@ -93,101 +272,10 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); - taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle)); + taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); } tdbTbcClose(pCur); return 0; } -int32_t tqMetaOpen(STQ* pTq) { - if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) { - ASSERT(0); - return -1; - } - - if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) { - ASSERT(0); - return -1; - } - - if (tqMetaRestoreHandle(pTq) < 0) { - return -1; - } - - return 0; -} - -int32_t tqMetaClose(STQ* pTq) { - if (pTq->pExecStore) { - tdbTbClose(pTq->pExecStore); - } - tdbClose(pTq->pMetaStore); - return 0; -} - -int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { - int32_t code; - int32_t vlen; - tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); - ASSERT(code == 0); - - tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId, - TD_VID(pTq->pVnode)); - - void* buf = taosMemoryCalloc(1, vlen); - if (buf == NULL) { - ASSERT(0); - } - - SEncoder encoder; - tEncoderInit(&encoder, buf, vlen); - - if (tEncodeSTqHandle(&encoder, pHandle) < 0) { - ASSERT(0); - } - - TXN txn; - - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); - } - - if (tdbBegin(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) { - ASSERT(0); - } - - if (tdbCommit(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - tEncoderClear(&encoder); - taosMemoryFree(buf); - return 0; -} - -int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { - TXN txn; - - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); - } - - if (tdbBegin(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) { - /*ASSERT(0);*/ - } - - if (tdbCommit(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - return 0; -} diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5d7814a045..b8803b20fc 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -394,7 +394,7 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) { int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { void* pIter = NULL; while (1) { - pIter = taosHashIterate(pTq->handles, pIter); + pIter = taosHashIterate(pTq->pHandle, pIter); if (pIter == NULL) break; STqHandle* pExec = (STqHandle*)pIter; if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index b4a7ce7737..c52e0e2c09 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -165,9 +165,9 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { STQ* pTq = pWriter->pTq; if (rollback) { - ASSERT(0); + tdbAbort(pWriter->pTq->pMetaDB, &pWriter->txn); } else { - code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); + code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn); if (code) goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ded3572746..54a5aacbd2 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -196,36 +196,42 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp break; /* TQ */ case TDMT_VND_MQ_VG_CHANGE: - if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + if (tqProcessVgChangeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; case TDMT_VND_MQ_VG_DELETE: - if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + if (tqProcessVgDeleteReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } break; case TDMT_VND_MQ_COMMIT_OFFSET: - if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead), version) < 0) { + if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; - case TDMT_VND_CHECK_ALTER_INFO: - if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + case TDMT_VND_ADD_CHECK_INFO: + if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { + goto _err; + } + break; + case TDMT_VND_DELETE_CHECK_INFO: + if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; case TDMT_STREAM_TASK_DEPLOY: { - if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + if (tqProcessTaskDeployReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_DROP: { - if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + if (tqProcessTaskDropReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } } break; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index b234ff97c9..c19b459cdb 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -192,6 +192,24 @@ static bool validateTimezoneFormat(const SValueNode* pVal) { return true; } +static int32_t countTrailingSpaces(const SValueNode* pVal, bool isLtrim) { + int32_t numOfSpaces = 0; + int32_t len = varDataLen(pVal->datum.p); + char* str = varDataVal(pVal->datum.p); + + int32_t startPos = isLtrim ? 0 : len - 1; + int32_t step = isLtrim ? 1 : -1; + for (int32_t i = startPos; i < len || i >= 0; i += step) { + if (!isspace(str[i])) { + break; + } + numOfSpaces++; + } + + return numOfSpaces; + +} + void static addTimezoneParam(SNodeList* pList) { char buf[6] = {0}; time_t t = taosTime(NULL); @@ -293,6 +311,40 @@ static int32_t translateInOutStr(SFunctionNode* pFunc, char* pErrBuf, int32_t le return TSDB_CODE_SUCCESS; } +static int32_t translateTrimStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isLtrim) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); + if (!IS_VAR_DATA_TYPE(pPara1->resType.type)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + int32_t numOfSpaces = 0; + SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 0); + // for select trim functions with constant value from table, + // need to set the proper result result schema bytes to avoid + // trailing garbage characters + if (nodeType(pParamNode1) == QUERY_NODE_VALUE) { + SValueNode* pValue = (SValueNode*)pParamNode1; + numOfSpaces = countTrailingSpaces(pValue, isLtrim); + } + + + int32_t resBytes = pPara1->resType.bytes - numOfSpaces; + pFunc->node.resType = (SDataType){.bytes = resBytes, .type = pPara1->resType.type}; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateLtrim(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateTrimStr(pFunc, pErrBuf, len, true); +} + +static int32_t translateRtrim(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateTrimStr(pFunc, pErrBuf, len, false); +} + static int32_t translateLogarithm(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); if (1 != numOfParams && 2 != numOfParams) { @@ -2827,7 +2879,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "ltrim", .type = FUNCTION_TYPE_LTRIM, .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, - .translateFunc = translateInOutStr, + .translateFunc = translateLtrim, .getEnvFunc = NULL, .initFunc = NULL, .sprocessFunc = ltrimFunction, @@ -2837,7 +2889,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "rtrim", .type = FUNCTION_TYPE_RTRIM, .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, - .translateFunc = translateInOutStr, + .translateFunc = translateRtrim, .getEnvFunc = NULL, .initFunc = NULL, .sprocessFunc = rtrimFunction, diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index d0c5a76f4b..0d36fa0845 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -758,7 +758,9 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { res->datum.p = taosMemoryCalloc(len, 1); memcpy(res->datum.p, output.columnData->pData, len); } else if (IS_VAR_DATA_TYPE(type)) { - res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); + //res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); + res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData), 1); + res->node.resType.bytes = varDataTLen(output.columnData->pData); memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData)); } else { nodesSetValueNodeValue(res, output.columnData->pData); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b74e838628..64a9537e6c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -81,7 +81,7 @@ void streamMetaClose(SStreamMeta* pMeta) { taosMemoryFree(pMeta); } -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) { +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen) { SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { return -1; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index ebad365ce0..add007e14d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -205,28 +205,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - uint64_t ahandle = head->ahandle; \ - CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \ - SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \ - if (cliMsg->type == Release) return; \ - } \ - tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); \ - if (T_REF_VAL_GET(conn) > 1) { \ - transUnrefCliHandle(conn); \ - } \ - destroyCmsg(pMsg); \ - cliReleaseUnfinishedMsg(conn); \ - transQueueClear(&conn->cliMsgs); \ - addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \ - return; \ - } \ - } while (0) #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ do { \ @@ -358,7 +336,6 @@ void cliHandleResp(SCliConn* conn) { if (cliRecvReleaseReq(conn, pHead)) { return; } - CONN_SHOULD_RELEASE(conn, pHead); if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); @@ -1418,7 +1395,7 @@ int transReleaseCliHandle(void* handle) { } STransMsg tmsg = {.info.handle = handle}; - // TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); + TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cmsg->msg = tmsg;