From 8b3489760c1447e9789589cf29625fa5ed5fc227 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 30 Aug 2022 10:57:24 +0800 Subject: [PATCH] feat(tmq): support taosx --- include/client/taos.h | 1 + include/common/tcommon.h | 2 +- include/common/tmsg.h | 44 +++++--- include/util/tencode.h | 28 +++-- source/client/inc/clientInt.h | 23 +++- source/client/src/clientMain.c | 13 +++ source/client/src/tmq.c | 67 +++++++++-- source/common/src/tmsg.c | 122 +++++++++++++++++---- source/dnode/mnode/impl/src/mndSubscribe.c | 1 + 9 files changed, 243 insertions(+), 58 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index f260b84f4a..49cfbb52b8 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -254,6 +254,7 @@ enum tmq_res_t { TMQ_RES_INVALID = -1, TMQ_RES_DATA = 1, TMQ_RES_TABLE_META = 2, + TMQ_RES_TAOSX = 3, }; typedef struct tmq_raw_data { diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 03672f96f3..891c9ab040 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -73,6 +73,7 @@ enum { TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__POLL_META_RSP, TMQ_MSG_TYPE__EP_RSP, + TMQ_MSG_TYPE__TAOSX_RSP, TMQ_MSG_TYPE__END_RSP, }; @@ -129,7 +130,6 @@ typedef struct SDataBlockInfo { uint32_t capacity; // TODO: optimize and remove following int64_t version; // used for stream, and need serialization - int64_t ts; // used for stream, and need serialization int32_t childId; // used for stream, do not serialize EStreamType type; // used for stream, do not serialize STimeWindow calWin; // used for stream, do not serialize diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 681094471a..d503592361 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -276,7 +276,6 @@ struct SSchema { char name[TSDB_COL_NAME_LEN]; }; - typedef struct { char tbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; @@ -295,17 +294,15 @@ typedef struct { SSchema* pSchemas; } STableMetaRsp; - - typedef struct { - int32_t code; - int8_t hashMeta; - int64_t uid; - char* tblFName; - int32_t numOfRows; - int32_t affectedRows; - int64_t sver; - STableMetaRsp* pMeta; + int32_t code; + int8_t hashMeta; + int64_t uid; + char* tblFName; + int32_t numOfRows; + int32_t affectedRows; + int64_t sver; + STableMetaRsp* pMeta; } SSubmitBlkRsp; typedef struct { @@ -320,7 +317,7 @@ typedef struct { int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); -void tFreeSSubmitBlkRsp(void* param); +void tFreeSSubmitBlkRsp(void* param); void tFreeSSubmitRsp(SSubmitRsp* pRsp); #define COL_SMA_ON ((int8_t)0x1) @@ -2049,8 +2046,8 @@ typedef struct { STableMetaRsp* pMeta; } SVCreateTbRsp, SVUpdateTbRsp; -int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp); -int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp); +int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp); +int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp); void tFreeSVCreateTbRsp(void* param); int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); @@ -2961,6 +2958,25 @@ typedef struct { int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); +typedef struct { + SMqRspHead head; + STqOffsetVal reqOffset; + STqOffsetVal rspOffset; + int32_t blockNum; + int8_t withTbName; + int8_t withSchema; + SArray* blockDataLen; + SArray* blockData; + SArray* blockTbName; + SArray* blockSchema; + int32_t createTableNum; + SArray* createTableLen; + SArray* createTableReq; +} STaosxRsp; + +int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const STaosxRsp* pRsp); +int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp); + typedef struct { SMqRspHead head; char cgroup[TSDB_CGROUP_LEN]; diff --git a/include/util/tencode.h b/include/util/tencode.h index ad642cd612..a6dd58297e 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -264,12 +264,14 @@ static FORCE_INLINE int32_t tEncodeDouble(SEncoder* pCoder, double val) { static FORCE_INLINE int32_t tEncodeBinary(SEncoder* pCoder, const uint8_t* val, uint32_t len) { if (tEncodeU32v(pCoder, len) < 0) return -1; - if (pCoder->data) { - if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, len)) return -1; - memcpy(TD_CODER_CURRENT(pCoder), val, len); - } + if (len) { + if (pCoder->data) { + if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, len)) return -1; + memcpy(TD_CODER_CURRENT(pCoder), val, len); + } - TD_CODER_MOVE_POS(pCoder, len); + TD_CODER_MOVE_POS(pCoder, len); + } return 0; } @@ -414,14 +416,18 @@ static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val) { static FORCE_INLINE int32_t tDecodeBinaryAlloc(SDecoder* pCoder, void** val, uint64_t* len) { uint64_t length = 0; if (tDecodeU64v(pCoder, &length) < 0) return -1; - if (len) *len = length; + if (length) { + if (len) *len = length; - if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1; - *val = taosMemoryMalloc(length); - if (*val == NULL) return -1; - memcpy(*val, TD_CODER_CURRENT(pCoder), length); + if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1; + *val = taosMemoryMalloc(length); + if (*val == NULL) return -1; + memcpy(*val, TD_CODER_CURRENT(pCoder), length); - TD_CODER_MOVE_POS(pCoder, length); + TD_CODER_MOVE_POS(pCoder, length); + } else { + *val = NULL; + } return 0; } diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 07e5f75e87..c26208b9b9 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -52,15 +52,17 @@ enum { RES_TYPE__QUERY = 1, RES_TYPE__TMQ, RES_TYPE__TMQ_META, + RES_TYPE__TAOSX, }; #define SHOW_VARIABLES_RESULT_COLS 2 #define SHOW_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE) #define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) -#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) -#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) -#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) +#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) +#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ || *(int8_t*)res == RES_TYPE__TAOSX) +#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) +#define TD_RES_TMQ_TAOSX(res) (*(int8_t*)res == RES_TYPE__TAOSX) typedef struct SAppInstInfo SAppInstInfo; @@ -198,8 +200,8 @@ typedef struct { int32_t vgId; SSchemaWrapper schema; int32_t resIter; - SMqDataRsp rsp; SReqResultInfo resInfo; + SMqDataRsp rsp; } SMqRspObj; typedef struct { @@ -210,6 +212,17 @@ typedef struct { SMqMetaRsp metaRsp; } SMqMetaRspObj; +typedef struct { + int8_t resType; + char topic[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int32_t vgId; + SSchemaWrapper schema; + int32_t resIter; + SReqResultInfo resInfo; + STaosxRsp rsp; +} SMqTaosxRspObj; + typedef struct SRequestObj { int8_t resType; // query or tmq uint64_t requestId; @@ -369,7 +382,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList); void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta); -int32_t removeMeta(STscObj* pTscObj, SArray* tbList); +int32_t removeMeta(STscObj* pTscObj, SArray* tbList); int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog); bool qnodeRequired(SRequestObj* pRequest); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9ceb6e0683..3086078080 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -184,6 +184,19 @@ void taos_free_result(TAOS_RES *res) { SRequestObj *pRequest = (SRequestObj *)res; tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId); destroyRequest(pRequest); + } else if (TD_RES_TMQ_TAOSX(res)) { + SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res; + if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); + if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); + if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); + if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + // taosx + taosArrayDestroy(pRsp->rsp.createTableLen); + taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree); + + pRsp->resInfo.pRspMsg = NULL; + doFreeReqResultInfo(&pRsp->resInfo); + taosMemoryFree(pRsp); } else if (TD_RES_TMQ(res)) { SMqRspObj *pRsp = (SMqRspObj *)res; if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index fa657fcb10..29d509c27c 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -164,6 +164,7 @@ typedef struct { union { SMqDataRsp dataRsp; SMqMetaRsp metaRsp; + STaosxRsp taosxRsp; }; } SMqPollRspWrapper; @@ -1130,21 +1131,29 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp); tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); - } else { - ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); + + tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d", + tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, + rspType); + + } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp); tDecoderClear(&decoder); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); + } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) { + SDecoder decoder; + tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); + tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp); + tDecoderClear(&decoder); + memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead)); + } else { + ASSERT(0); } taosMemoryFree(pMsg->pData); - tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d", - tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, - rspType); - taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); @@ -1443,6 +1452,24 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { return pRspObj; } +SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { + SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); + pRspObj->resType = RES_TYPE__TAOSX; + tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); + tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); + pRspObj->vgId = pWrapper->vgHandle->vgId; + pRspObj->resIter = -1; + memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqTaosxRspObj)); + + pRspObj->resInfo.totalRows = 0; + pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; + if (!pWrapper->dataRsp.withSchema) { + setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); + } + + return pRspObj; +} + int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { /*tscDebug("call poll");*/ for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { @@ -1595,6 +1622,30 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pollRspWrapper->metaRsp.head.epoch, consumerEpoch); taosFreeQitem(pollRspWrapper); } + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { + SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; + /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ + int32_t consumerEpoch = atomic_load_32(&tmq->epoch); + if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { + SMqClientVg* pVg = pollRspWrapper->vgHandle; + /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, + * rspMsg->msg.rspOffset);*/ + pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + if (pollRspWrapper->taosxRsp.blockNum == 0) { + taosFreeQitem(pollRspWrapper); + rspWrapper = NULL; + continue; + } + // build rsp + SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + taosFreeQitem(pollRspWrapper); + return pRsp; + } else { + tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", + pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + taosFreeQitem(pollRspWrapper); + } } else { /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/ bool reset = false; @@ -1707,9 +1758,11 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) { - return TMQ_RES_DATA; + return TMQ_RES_TAOSX; } return TMQ_RES_TABLE_META; + } else if (TD_RES_TMQ_TAOSX(res)) { + return TMQ_RES_TAOSX; } else { return TMQ_RES_INVALID; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8dc4931573..2fc93cc9b5 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3330,7 +3330,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) { return 0; } -void tFreeSTableMetaRsp(void *pRsp) { taosMemoryFreeClear(((STableMetaRsp*)pRsp)->pSchemas); } +void tFreeSTableMetaRsp(void *pRsp) { taosMemoryFreeClear(((STableMetaRsp *)pRsp)->pSchemas); } void tFreeSTableIndexRsp(void *info) { if (NULL == info) { @@ -5119,17 +5119,17 @@ int tDecodeSVCreateTbRsp(SDecoder *pCoder, SVCreateTbRsp *pRsp) { } else { pRsp->pMeta = NULL; } - + tEndDecode(pCoder); return 0; } -void tFreeSVCreateTbRsp(void* param) { +void tFreeSVCreateTbRsp(void *param) { if (NULL == param) { return; } - - SVCreateTbRsp* pRsp = (SVCreateTbRsp*)param; + + SVCreateTbRsp *pRsp = (SVCreateTbRsp *)param; if (pRsp->pMeta) { taosMemoryFree(pRsp->pMeta->pSchemas); taosMemoryFree(pRsp->pMeta); @@ -5345,7 +5345,7 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) { if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1; if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1; if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1; - + int32_t meta = 0; if (tDecodeI32(pDecoder, &meta) < 0) return -1; if (meta) { @@ -5393,12 +5393,12 @@ int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) { return 0; } -void tFreeSSubmitBlkRsp(void* param) { +void tFreeSSubmitBlkRsp(void *param) { if (NULL == param) { return; } - - SSubmitBlkRsp* pRsp = (SSubmitBlkRsp*)param; + + SSubmitBlkRsp *pRsp = (SSubmitBlkRsp *)param; taosMemoryFree(pRsp->tblFName); if (pRsp->pMeta) { @@ -5407,7 +5407,6 @@ void tFreeSSubmitBlkRsp(void* param) { } } - void tFreeSSubmitRsp(SSubmitRsp *pRsp) { if (NULL == pRsp) return; @@ -5619,7 +5618,6 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) { } } - int32_t tEncodeSMCreateStbRsp(SEncoder *pEncoder, const SMCreateStbRsp *pRsp) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->pMeta->pSchemas ? 1 : 0) < 0) return -1; @@ -5671,8 +5669,6 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp *pRsp) { } } - - int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) { if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1; if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) { @@ -5690,7 +5686,7 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { if (tDecodeI8(pDecoder, &pOffsetVal->type) < 0) return -1; - if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) { + if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) { if (tDecodeI64(pDecoder, &pOffsetVal->uid) < 0) return -1; if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1; } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { @@ -5712,7 +5708,7 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { snprintf(buf, maxLen, "offset(reset to latest)"); } else if (pVal->type == TMQ_OFFSET__LOG) { snprintf(buf, maxLen, "offset(log) ver:%" PRId64, pVal->version); - } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) { + } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) { snprintf(buf, maxLen, "offset(ss data) uid:%" PRId64 ", ts:%" PRId64, pVal->uid, pVal->ts); } else { ASSERT(0); @@ -5813,17 +5809,17 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) { return 0; } -int32_t tEncodeSMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp) { +int32_t tEncodeSMqMetaRsp(SEncoder *pEncoder, const SMqMetaRsp *pRsp) { if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; - if(tEncodeI16(pEncoder, pRsp->resMsgType)) return -1; - if(tEncodeBinary(pEncoder, pRsp->metaRsp, pRsp->metaRspLen)) return -1; + if (tEncodeI16(pEncoder, pRsp->resMsgType)) return -1; + if (tEncodeBinary(pEncoder, pRsp->metaRsp, pRsp->metaRspLen)) return -1; return 0; } -int32_t tDecodeSMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp) { +int32_t tDecodeSMqMetaRsp(SDecoder *pDecoder, SMqMetaRsp *pRsp) { if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; if (tDecodeI16(pDecoder, &pRsp->resMsgType) < 0) return -1; - if (tDecodeBinaryAlloc(pDecoder, &pRsp->metaRsp, (uint64_t*)&pRsp->metaRspLen) < 0) return -1; + if (tDecodeBinaryAlloc(pDecoder, &pRsp->metaRsp, (uint64_t *)&pRsp->metaRspLen) < 0) return -1; return 0; } @@ -5893,6 +5889,92 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { return 0; } +int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { + if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; + if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1; + if (pRsp->blockNum != 0) { + if (tEncodeI8(pEncoder, pRsp->withTbName) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->withSchema) < 0) return -1; + + for (int32_t i = 0; i < pRsp->blockNum; i++) { + int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i); + void *data = taosArrayGetP(pRsp->blockData, i); + if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1; + if (pRsp->withSchema) { + SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i); + if (tEncodeSSchemaWrapper(pEncoder, pSW) < 0) return -1; + } + if (pRsp->withTbName) { + char *tbName = (char *)taosArrayGetP(pRsp->blockTbName, i); + if (tEncodeCStr(pEncoder, tbName) < 0) return -1; + } + } + } + if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1; + if (pRsp->createTableNum) { + for (int32_t i = 0; i < pRsp->createTableNum; i++) { + void *createTableReq = taosArrayGetP(pRsp->createTableReq, i); + int32_t createTableLen = *(int32_t *)taosArrayGet(pRsp->createTableLen, i); + if (tEncodeBinary(pEncoder, createTableReq, createTableLen) < 0) return -1; + } + } + return 0; +} + +int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { + if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1; + if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1; + if (pRsp->blockNum != 0) { + pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void *)); + pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t)); + if (tDecodeI8(pDecoder, &pRsp->withTbName) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->withSchema) < 0) return -1; + if (pRsp->withTbName) { + pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void *)); + } + if (pRsp->withSchema) { + pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void *)); + } + + for (int32_t i = 0; i < pRsp->blockNum; i++) { + void *data; + uint64_t bLen; + if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1; + taosArrayPush(pRsp->blockData, &data); + int32_t len = bLen; + taosArrayPush(pRsp->blockDataLen, &len); + + if (pRsp->withSchema) { + SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pSW == NULL) return -1; + if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) return -1; + taosArrayPush(pRsp->blockSchema, &pSW); + } + + if (pRsp->withTbName) { + char *tbName; + if (tDecodeCStrAlloc(pDecoder, &tbName) < 0) return -1; + taosArrayPush(pRsp->blockTbName, &tbName); + } + } + } + if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1; + if (pRsp->createTableNum) { + pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t)); + pRsp->createTableReq = taosArrayInit(pRsp->createTableNum, sizeof(void *)); + for (int32_t i = 0; i < pRsp->createTableNum; i++) { + void *pCreate = NULL; + uint64_t len; + if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1; + int32_t l = (int32_t)len; + taosArrayPush(pRsp->createTableLen, &l); + taosArrayPush(pRsp->createTableReq, &pCreate); + } + } + return 0; +} int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1; if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 10e520d9ec..1452c5ae2f 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -287,6 +287,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR if (consumerVgNum > minVgCnt) { if (imbCnt < imbConsumerNum) { if (consumerVgNum == minVgCnt + 1) { + imbCnt++; continue; } else { // pop until equal minVg + 1