diff --git a/cmake/bdb_CMakeLists.txt.in b/cmake/bdb_CMakeLists.txt.in index 651a657c0d..52cef74db6 100644 --- a/cmake/bdb_CMakeLists.txt.in +++ b/cmake/bdb_CMakeLists.txt.in @@ -10,4 +10,4 @@ ExternalProject_Add(bdb BUILD_COMMAND "$(MAKE)" INSTALL_COMMAND "" TEST_COMMAND "" -) \ No newline at end of file +) diff --git a/contrib/test/bdb/CMakeLists.txt b/contrib/test/bdb/CMakeLists.txt index 62da0d4ac8..8dd84f7107 100644 --- a/contrib/test/bdb/CMakeLists.txt +++ b/contrib/test/bdb/CMakeLists.txt @@ -4,4 +4,4 @@ target_sources( "bdbTest.c" ) -target_link_libraries(bdbTest bdb) \ No newline at end of file +target_link_libraries(bdbTest bdb) diff --git a/contrib/test/bdb/bdbTest.c b/contrib/test/bdb/bdbTest.c index cbdc07be91..ddcba23b1f 100644 --- a/contrib/test/bdb/bdbTest.c +++ b/contrib/test/bdb/bdbTest.c @@ -185,4 +185,4 @@ static int idx_callback(DB * sdbp, /* secondary db handle */ skeyp->data = tmpdbt; return 0; -} \ No newline at end of file +} diff --git a/example/src/tmq.c b/example/src/tmq.c index d8c90a0127..ac7098b254 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -28,7 +28,7 @@ int32_t init_env() { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7337f1afb8..c6a7d06898 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -52,20 +52,20 @@ extern char* tMsgInfo[]; extern int tMsgDict[]; #define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8) -#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) -#define TMSG_INFO(TYPE) tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] -#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) +#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) +#define TMSG_INFO(TYPE) tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] +#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) typedef uint16_t tmsg_t; /* ------------------------ OTHER DEFINITIONS ------------------------ */ // IE type -#define TSDB_IE_TYPE_SEC 1 -#define TSDB_IE_TYPE_META 2 -#define TSDB_IE_TYPE_MGMT_IP 3 -#define TSDB_IE_TYPE_DNODE_CFG 4 +#define TSDB_IE_TYPE_SEC 1 +#define TSDB_IE_TYPE_META 2 +#define TSDB_IE_TYPE_MGMT_IP 3 +#define TSDB_IE_TYPE_DNODE_CFG 4 #define TSDB_IE_TYPE_NEW_VERSION 5 -#define TSDB_IE_TYPE_DNODE_EXT 6 +#define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_STATE 7 typedef enum { @@ -110,53 +110,53 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_MAX, } EShowType; -#define TSDB_ALTER_TABLE_ADD_TAG 1 -#define TSDB_ALTER_TABLE_DROP_TAG 2 +#define TSDB_ALTER_TABLE_ADD_TAG 1 +#define TSDB_ALTER_TABLE_DROP_TAG 2 #define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3 -#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 +#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 -#define TSDB_ALTER_TABLE_ADD_COLUMN 5 -#define TSDB_ALTER_TABLE_DROP_COLUMN 6 +#define TSDB_ALTER_TABLE_ADD_COLUMN 5 +#define TSDB_ALTER_TABLE_DROP_COLUMN 6 #define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7 -#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8 +#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8 -#define TSDB_FILL_NONE 0 -#define TSDB_FILL_NULL 1 +#define TSDB_FILL_NONE 0 +#define TSDB_FILL_NULL 1 #define TSDB_FILL_SET_VALUE 2 -#define TSDB_FILL_LINEAR 3 -#define TSDB_FILL_PREV 4 -#define TSDB_FILL_NEXT 5 +#define TSDB_FILL_LINEAR 3 +#define TSDB_FILL_PREV 4 +#define TSDB_FILL_NEXT 5 -#define TSDB_ALTER_USER_PASSWD 0x1 -#define TSDB_ALTER_USER_SUPERUSER 0x2 -#define TSDB_ALTER_USER_ADD_READ_DB 0x3 -#define TSDB_ALTER_USER_REMOVE_READ_DB 0x4 -#define TSDB_ALTER_USER_CLEAR_READ_DB 0x5 -#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6 +#define TSDB_ALTER_USER_PASSWD 0x1 +#define TSDB_ALTER_USER_SUPERUSER 0x2 +#define TSDB_ALTER_USER_ADD_READ_DB 0x3 +#define TSDB_ALTER_USER_REMOVE_READ_DB 0x4 +#define TSDB_ALTER_USER_CLEAR_READ_DB 0x5 +#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6 #define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7 -#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8 +#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8 #define TSDB_ALTER_USER_PRIVILEGES 0x2 #define TSDB_KILL_MSG_LEN 30 -#define TSDB_VN_READ_ACCCESS ((char)0x1) +#define TSDB_VN_READ_ACCCESS ((char)0x1) #define TSDB_VN_WRITE_ACCCESS ((char)0x2) -#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) +#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) #define TSDB_COL_NORMAL 0x0u // the normal column of the table -#define TSDB_COL_TAG 0x1u // the tag column type -#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column -#define TSDB_COL_TMP 0x4u // internal column generated by the previous operators -#define TSDB_COL_NULL 0x8u // the column filter NULL or not +#define TSDB_COL_TAG 0x1u // the tag column type +#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column +#define TSDB_COL_TMP 0x4u // internal column generated by the previous operators +#define TSDB_COL_NULL 0x8u // the column filter NULL or not -#define TSDB_COL_IS_TAG(f) (((f & (~(TSDB_COL_NULL))) & TSDB_COL_TAG) != 0) +#define TSDB_COL_IS_TAG(f) (((f & (~(TSDB_COL_NULL))) & TSDB_COL_TAG) != 0) #define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) -#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) -#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) +#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) +#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) -#define TD_SUPER_TABLE TSDB_SUPER_TABLE -#define TD_CHILD_TABLE TSDB_CHILD_TABLE +#define TD_SUPER_TABLE TSDB_SUPER_TABLE +#define TD_CHILD_TABLE TSDB_CHILD_TABLE #define TD_NORMAL_TABLE TSDB_NORMAL_TABLE typedef struct { @@ -1083,11 +1083,11 @@ typedef struct { } STaskDropRsp; typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; - int8_t igExists; - char* sql; - char* physicalPlan; - char* logicalPlan; + char name[TSDB_TOPIC_FNAME_LEN]; + int8_t igExists; + char* sql; + char* physicalPlan; + char* logicalPlan; } SMCreateTopicReq; int32_t tSerializeMCreateTopicReq(void* buf, int32_t bufLen, const SMCreateTopicReq* pReq); @@ -1733,7 +1733,7 @@ typedef struct { int32_t vgId; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; typedef struct { @@ -1741,9 +1741,42 @@ typedef struct { int32_t vgId; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqMVRebRsp; +typedef struct { + int32_t vgId; + int64_t offset; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; +} SMqOffset; + +typedef struct { + int32_t num; + SMqOffset* offsets; +} SMqCMResetOffsetReq; + +typedef struct { + int32_t reserved; +} SMqCMResetOffsetRsp; + +typedef struct { + int32_t num; + SMqOffset* offsets; +} SMqMVResetOffsetReq; + +typedef struct { + int32_t reserved; +} SMqMVResetOffsetRsp; + +int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset); +int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset); +int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq); +int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq); + +int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq); +int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq); + typedef struct { uint32_t nCols; SSchema* pSchema; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index ca30e682bf..9e5613ef03 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -142,6 +142,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) + TD_DEF_MSG_TYPE(TDMT_MND_RESET_OFFSET, "mnode-reset-offset", SMqCMResetOffsetReq, SMqCMResetOffsetRsp) TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 419d6294a7..47f278d380 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -35,9 +35,7 @@ struct tmq_list_t { }; struct tmq_topic_vgroup_t { - char* topic; - int32_t vgId; - int64_t offset; + SMqOffset offset; }; struct tmq_topic_vgroup_list_t { @@ -123,6 +121,12 @@ typedef struct { tsem_t rspSem; } SMqCommitCbParam; +typedef struct { + tmq_t* tmq; + tsem_t rspSem; + tmq_resp_err_t rspErr; +} SMqResetOffsetParam; + tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); conf->auto_commit = false; @@ -173,12 +177,6 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) { return 0; } -tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { - // build msg - // send to mnode - return TMQ_RESP_ERR__SUCCESS; -} - int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; @@ -196,6 +194,13 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } +int32_t tmqResetOffsetCb(void* param, const SDataBuf* pMsg, int32_t code) { + SMqResetOffsetParam* pParam = (SMqResetOffsetParam*)param; + pParam->rspErr = code; + tsem_post(&pParam->rspSem); + return 0; +} + tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = calloc(sizeof(tmq_t), 1); if (pTmq == NULL) { @@ -218,6 +223,55 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return pTmq; } +tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { + SRequestObj* pRequest = NULL; + // build msg + // send to mnode + SMqCMResetOffsetReq req; + req.num = offsets->cnt; + req.offsets = (SMqOffset*)offsets->elems; + + SCoder encoder; + + tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); + tEncodeSMqCMResetOffsetReq(&encoder, &req); + int32_t tlen = encoder.pos; + void* buf = malloc(tlen); + if (buf == NULL) { + tCoderClear(&encoder); + return -1; + } + tCoderClear(&encoder); + + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); + tEncodeSMqCMResetOffsetReq(&encoder, &req); + tCoderClear(&encoder); + + pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_RESET_OFFSET); + if (pRequest == NULL) { + tscError("failed to malloc request"); + } + + SMqResetOffsetParam param = {0}; + tsem_init(¶m.rspSem, 0, 0); + param.tmq = tmq; + + pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->param = ¶m; + sendInfo->fp = tmqResetOffsetCb; + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + + tsem_wait(¶m.rspSem); + tsem_destroy(¶m.rspSem); + + return TMQ_RESP_ERR__SUCCESS; +} + tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { SRequestObj* pRequest = NULL; int32_t sz = topic_list->cnt; @@ -244,6 +298,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN); if (topicFname == NULL) { + goto _return; } tNameExtractFullName(&name, topicFname); tscDebug("subscribe topic: %s", topicFname); @@ -251,9 +306,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { .nextVgIdx = 0, .sql = NULL, .sqlLen = 0, .topicId = 0, .topicName = topicFname, .vgs = NULL}; topic.vgs = taosArrayInit(0, sizeof(SMqClientVg)); taosArrayPush(tmq->clientTopics, &topic); - /*SMqClientTopic topic = {*/ - /*.*/ - /*};*/ taosArrayPush(req.topicNames, &topicFname); free(dbName); } @@ -270,7 +322,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE); if (pRequest == NULL) { - tscError("failed to malloc sqlObj"); + tscError("failed to malloc request"); } SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq}; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3806227803..4b6171362e 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2306,3 +2306,57 @@ int32_t tDeserializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) { tCoderClear(&decoder); return 0; } + +int32_t tEncodeSMqOffset(SCoder *encoder, const SMqOffset *pOffset) { + if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1; + if (tEncodeI64(encoder, pOffset->offset) < 0) return -1; + if (tEncodeCStr(encoder, pOffset->topicName) < 0) return -1; + if (tEncodeCStr(encoder, pOffset->cgroup) < 0) return -1; + return encoder->pos; +} + +int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) { + if (tDecodeI32(decoder, &pOffset->vgId) < 0) return -1; + if (tDecodeI64(decoder, &pOffset->offset) < 0) return -1; + if (tDecodeCStrTo(decoder, pOffset->topicName) < 0) return -1; + if (tDecodeCStrTo(decoder, pOffset->cgroup) < 0) return -1; + return 0; +} + +int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) { + if (tStartEncode(encoder) < 0) return -1; + if (tEncodeI32(encoder, pReq->num) < 0) return -1; + for (int32_t i = 0; i < pReq->num; i++) { + tEncodeSMqOffset(encoder, &pReq->offsets[i]); + } + tEndEncode(encoder); + return encoder->pos; +} + +int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) { + if (tDecodeI32(decoder, &pReq->num) < 0) return -1; + pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder); + if (pReq->offsets == NULL) return -1; + for (int32_t i = 0; i < pReq->num; i++) { + tDecodeSMqOffset(decoder, &pReq->offsets[i]); + } + return 0; +} + +int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) { + if (tEncodeI32(encoder, pReq->num) < 0) return -1; + for (int32_t i = 0; i < pReq->num; i++) { + tEncodeSMqOffset(encoder, &pReq->offsets[i]); + } + return encoder->pos; +} + +int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) { + if (tDecodeI32(decoder, &pReq->num) < 0) return -1; + pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder); + if (pReq->offsets == NULL) return -1; + for (int32_t i = 0; i < pReq->num; i++) { + tDecodeSMqOffset(decoder, &pReq->offsets[i]); + } + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 840326d318..7fc194de9c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -345,6 +345,14 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } } + if (status == MQ_CONSUMER_STATUS__MODIFY) { + int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics); + for (int32_t i = 0; i < removeSz; i++) { + char *topicName = taosArrayGet(pConsumer->recentRemovedTopics, i); + free(topicName); + } + taosArrayClear(pConsumer->recentRemovedTopics); + } } } if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { @@ -1013,6 +1021,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { break; } } + char *oldTopicNameDup = strdup(oldTopicName); + taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup); atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY); /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/ } else if (newTopicName != NULL) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2f70ab5080..4dc46b3798 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -192,12 +192,12 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu if (pTopic->pReadhandle == NULL) { ASSERT(false); } - for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - pTopic->buffer.output[i].status = 0; + for (int j = 0; j < TQ_BUFFER_SIZE; j++) { + pTopic->buffer.output[j].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; - pTopic->buffer.output[i].pReadHandle = pReadHandle; - pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); + pTopic->buffer.output[j].pReadHandle = pReadHandle; + pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); } }