feat:add parameters for consumer & add offset rows for subscription
This commit is contained in:
		
							parent
							
								
									11321aa279
								
							
						
					
					
						commit
						3f46a9bbc9
					
				|  | @ -2033,6 +2033,12 @@ typedef struct { | |||
|   char    cgroup[TSDB_CGROUP_LEN]; | ||||
|   char    clientId[256]; | ||||
|   SArray* topicNames;  // SArray<char**>
 | ||||
| 
 | ||||
|   int8_t         withTbName; | ||||
|   int8_t         useSnapshot; | ||||
|   int8_t         autoCommit; | ||||
|   int32_t        autoCommitInterval; | ||||
|   int8_t         resetOffsetCfg; | ||||
| } SCMSubscribeReq; | ||||
| 
 | ||||
| static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { | ||||
|  | @ -2047,6 +2053,13 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc | |||
|   for (int32_t i = 0; i < topicNum; i++) { | ||||
|     tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i)); | ||||
|   } | ||||
| 
 | ||||
|   tlen += taosEncodeFixedI8(buf, pReq->withTbName); | ||||
|   tlen += taosEncodeFixedI8(buf, pReq->useSnapshot); | ||||
|   tlen += taosEncodeFixedI8(buf, pReq->autoCommit); | ||||
|   tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval); | ||||
|   tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); | ||||
| 
 | ||||
|   return tlen; | ||||
| } | ||||
| 
 | ||||
|  | @ -2064,6 +2077,12 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq | |||
|     buf = taosDecodeString(buf, &name); | ||||
|     taosArrayPush(pReq->topicNames, &name); | ||||
|   } | ||||
| 
 | ||||
|   buf = taosDecodeFixedI8(buf, &pReq->withTbName); | ||||
|   buf = taosDecodeFixedI8(buf, &pReq->useSnapshot); | ||||
|   buf = taosDecodeFixedI8(buf, &pReq->autoCommit); | ||||
|   buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval); | ||||
|   buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); | ||||
|   return buf; | ||||
| } | ||||
| 
 | ||||
|  | @ -2455,15 +2474,6 @@ typedef struct { | |||
|   char    cgroup[TSDB_CGROUP_LEN]; | ||||
| } SMqAskEpReq; | ||||
| 
 | ||||
| typedef struct { | ||||
|   int64_t consumerId; | ||||
|   int32_t epoch; | ||||
| } SMqHbReq; | ||||
| 
 | ||||
| typedef struct { | ||||
|   int8_t reserved; | ||||
| } SMqHbRsp; | ||||
| 
 | ||||
| typedef struct { | ||||
|   int32_t key; | ||||
|   int32_t valueLen; | ||||
|  | @ -3354,6 +3364,28 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) { | |||
|   taosArrayDestroyEx(pRsp->topics, (FDelete)tDeleteMqSubTopicEp); | ||||
| } | ||||
| 
 | ||||
| typedef struct { | ||||
|   int32_t      vgId; | ||||
|   STqOffsetVal offset; | ||||
|   int64_t      rows; | ||||
| }OffsetRows; | ||||
| 
 | ||||
| typedef struct{ | ||||
|   char       topicName[TSDB_TOPIC_FNAME_LEN]; | ||||
|   SArray*    offsetRows; | ||||
| }TopicOffsetRows; | ||||
| 
 | ||||
| typedef struct { | ||||
|   int64_t consumerId; | ||||
|   int32_t epoch; | ||||
|   SArray* topics; | ||||
| } SMqHbReq; | ||||
| 
 | ||||
| typedef struct { | ||||
|   int8_t reserved; | ||||
| } SMqHbRsp; | ||||
| 
 | ||||
| 
 | ||||
| #define TD_AUTO_CREATE_TABLE 0x1 | ||||
| typedef struct { | ||||
|   int64_t       suid; | ||||
|  | @ -3478,10 +3510,8 @@ int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); | |||
| int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); | ||||
| int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); | ||||
| int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); | ||||
| int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); | ||||
| int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); | ||||
| int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); | ||||
| int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); | ||||
| int32_t tDeatroySMqHbReq(SMqHbReq* pReq); | ||||
| 
 | ||||
| 
 | ||||
| #define SUBMIT_REQ_AUTO_CREATE_TABLE  0x1 | ||||
| #define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 | ||||
|  |  | |||
|  | @ -195,6 +195,7 @@ typedef enum ELogicConditionType { | |||
| #define TSDB_TABLE_NAME_LEN  193                                // it is a null-terminated string
 | ||||
| #define TSDB_TOPIC_NAME_LEN  193                                // it is a null-terminated string
 | ||||
| #define TSDB_CGROUP_LEN      193                                // it is a null-terminated string
 | ||||
| #define TSDB_OFFSET_LEN      80                                // it is a null-terminated string
 | ||||
| #define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN)  // it is a null-terminated string
 | ||||
| #define TSDB_STREAM_NAME_LEN 193                                // it is a null-terminated string
 | ||||
| #define TSDB_DB_NAME_LEN     65 | ||||
|  |  | |||
|  | @ -82,7 +82,7 @@ struct tmq_t { | |||
|   int8_t         useSnapshot; | ||||
|   int8_t         autoCommit; | ||||
|   int32_t        autoCommitInterval; | ||||
|   int32_t        resetOffsetCfg; | ||||
|   int8_t         resetOffsetCfg; | ||||
|   uint64_t       consumerId; | ||||
|   bool           hbBgEnable; | ||||
|   tmq_commit_cb* commitCb; | ||||
|  | @ -567,10 +567,10 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN | |||
|   atomic_add_fetch_32(&pParamSet->totalRspNum, 1); | ||||
| 
 | ||||
|   SEp* pEp = GET_ACTIVE_EP(&pVg->epSet); | ||||
|   char offsetBuf[80] = {0}; | ||||
|   char offsetBuf[TSDB_OFFSET_LEN] = {0}; | ||||
|   tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val); | ||||
| 
 | ||||
|   char commitBuf[80] = {0}; | ||||
|   char commitBuf[TSDB_OFFSET_LEN] = {0}; | ||||
|   tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); | ||||
|   tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, | ||||
|            tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1, | ||||
|  | @ -796,6 +796,21 @@ void tmqSendHbReq(void* param, void* tmrId) { | |||
|   SMqHbReq req = {0}; | ||||
|   req.consumerId = tmq->consumerId; | ||||
|   req.epoch = tmq->epoch; | ||||
|   req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); | ||||
|   for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){ | ||||
|     SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); | ||||
|     int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs); | ||||
|     TopicOffsetRows* data = taosArrayReserve(req.topics, 1); | ||||
|     strcpy(data->topicName, pTopic->topicName); | ||||
|     data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows)); | ||||
|     for(int j = 0; j < numOfVgroups; j++){ | ||||
|       SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); | ||||
|       OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); | ||||
|       offRows->vgId = pVg->vgId; | ||||
|       offRows->rows = pVg->numOfRows; | ||||
|       offRows->offset = pVg->offsetInfo.committedOffset; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); | ||||
|   if (tlen < 0) { | ||||
|  | @ -835,6 +850,7 @@ void tmqSendHbReq(void* param, void* tmrId) { | |||
|   asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); | ||||
| 
 | ||||
| OVER: | ||||
|   tDeatroySMqHbReq(&req); | ||||
|   taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer); | ||||
|   taosReleaseRef(tmqMgmt.rsetId, refId); | ||||
| } | ||||
|  | @ -1107,7 +1123,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { | |||
|     pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer); | ||||
|   } | ||||
| 
 | ||||
|   char         buf[80] = {0}; | ||||
|   char         buf[TSDB_OFFSET_LEN] = {0}; | ||||
|   STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; | ||||
|   tFormatOffset(buf, tListLen(buf), &offset); | ||||
|   tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64 | ||||
|  | @ -1123,7 +1139,7 @@ _failed: | |||
| } | ||||
| 
 | ||||
| int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { | ||||
|   const int32_t   MAX_RETRY_COUNT = 120 * 60;  // let's wait for 2 mins at most
 | ||||
|   const int32_t   MAX_RETRY_COUNT = 120 * 2;  // let's wait for 2 mins at most
 | ||||
|   const SArray*   container = &topic_list->container; | ||||
|   int32_t         sz = taosArrayGetSize(container); | ||||
|   void*           buf = NULL; | ||||
|  | @ -1143,6 +1159,12 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { | |||
|     goto FAIL; | ||||
|   } | ||||
| 
 | ||||
|   req.withTbName = tmq->withTbName; | ||||
|   req.useSnapshot = tmq->useSnapshot; | ||||
|   req.autoCommit = tmq->autoCommit; | ||||
|   req.autoCommitInterval = tmq->autoCommitInterval; | ||||
|   req.resetOffsetCfg = tmq->resetOffsetCfg; | ||||
| 
 | ||||
|   for (int32_t i = 0; i < sz; i++) { | ||||
|     char* topic = taosArrayGetP(container, i); | ||||
| 
 | ||||
|  | @ -1375,8 +1397,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { | |||
|     tDecoderClear(&decoder); | ||||
|     memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); | ||||
| 
 | ||||
|     char buf[80]; | ||||
|     tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset); | ||||
|     char buf[TSDB_OFFSET_LEN]; | ||||
|     tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset); | ||||
|     tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64, | ||||
|              tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId); | ||||
|   } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { | ||||
|  | @ -1523,8 +1545,8 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) | |||
|         SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); | ||||
|         makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); | ||||
| 
 | ||||
|         char buf[80]; | ||||
|         tFormatOffset(buf, 80, &pVgCur->offsetInfo.currentOffset); | ||||
|         char buf[TSDB_OFFSET_LEN]; | ||||
|         tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset); | ||||
|         tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, | ||||
|                  vgKey, buf); | ||||
| 
 | ||||
|  | @ -1745,7 +1767,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p | |||
|   sendInfo->msgType = TDMT_VND_TMQ_CONSUME; | ||||
| 
 | ||||
|   int64_t transporterId = 0; | ||||
|   char    offsetFormatBuf[80]; | ||||
|   char    offsetFormatBuf[TSDB_OFFSET_LEN]; | ||||
|   tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset); | ||||
| 
 | ||||
|   tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, | ||||
|  | @ -1882,8 +1904,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { | |||
|         pVg->offsetInfo.walVerEnd = pDataRsp->head.walever; | ||||
|         pVg->receivedInfoFromVnode = true; | ||||
| 
 | ||||
|         char buf[80]; | ||||
|         tFormatOffset(buf, 80, &pDataRsp->rspOffset); | ||||
|         char buf[TSDB_OFFSET_LEN]; | ||||
|         tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); | ||||
|         if (pDataRsp->blockNum == 0) { | ||||
|           tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 | ||||
|                    " total:%" PRId64 " reqId:0x%" PRIx64, | ||||
|  | @ -1990,8 +2012,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { | |||
| 
 | ||||
|         tmq->totalRows += numOfRows; | ||||
| 
 | ||||
|         char buf[80]; | ||||
|         tFormatOffset(buf, 80, &pVg->offsetInfo.currentOffset); | ||||
|         char buf[TSDB_OFFSET_LEN]; | ||||
|         tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); | ||||
|         tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 | ||||
|                  ", vg total:%" PRId64 " total:%" PRId64 " reqId:0x%" PRIx64, | ||||
|                  tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, | ||||
|  | @ -2608,7 +2630,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a | |||
|       sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO; | ||||
| 
 | ||||
|       int64_t transporterId = 0; | ||||
|       char    offsetFormatBuf[80]; | ||||
|       char    offsetFormatBuf[TSDB_OFFSET_LEN]; | ||||
|       tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); | ||||
| 
 | ||||
|       tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, | ||||
|  | @ -2645,7 +2667,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a | |||
| 
 | ||||
|         pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; | ||||
| 
 | ||||
|         char offsetBuf[80] = {0}; | ||||
|         char offsetBuf[TSDB_OFFSET_LEN] = {0}; | ||||
|         tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); | ||||
| 
 | ||||
|         tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf); | ||||
|  |  | |||
|  | @ -291,6 +291,8 @@ static const SSysDbTableSchema subscriptionSchema[] = { | |||
|     {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, | ||||
|     {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, | ||||
|     {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, | ||||
|     {.name = "offset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, | ||||
|     {.name = "rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, | ||||
| }; | ||||
| 
 | ||||
| static const SSysDbTableSchema vnodesSchema[] = { | ||||
|  | @ -359,6 +361,11 @@ static const SSysDbTableSchema consumerSchema[] = { | |||
|     {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, | ||||
|     {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, | ||||
|     {.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, | ||||
|     {.name = "withTbName", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, | ||||
|     {.name = "useSnapshot", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, | ||||
|     {.name = "autoCommit", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, | ||||
|     {.name = "autoCommitInterval", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, | ||||
|     {.name = "resetOffsetCfg", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, | ||||
| }; | ||||
| 
 | ||||
| static const SSysDbTableSchema offsetSchema[] = { | ||||
|  |  | |||
|  | @ -5338,6 +5338,15 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { | |||
|   return 0; | ||||
| } | ||||
| 
 | ||||
| int32_t tDeatroySMqHbReq(SMqHbReq* pReq){ | ||||
|   for(int i = 0; i < taosArrayGetSize(pReq->topics); i++){ | ||||
|     TopicOffsetRows* vgs = taosArrayGet(pReq->topics, i); | ||||
|     if(vgs) taosArrayDestroy(vgs->offsetRows); | ||||
|   } | ||||
|   taosArrayDestroy(pReq->topics); | ||||
|   return 0; | ||||
| } | ||||
| 
 | ||||
| int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { | ||||
|   SEncoder encoder = {0}; | ||||
|   tEncoderInit(&encoder, buf, bufLen); | ||||
|  | @ -5346,6 +5355,21 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { | |||
|   if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1; | ||||
|   if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1; | ||||
| 
 | ||||
|   int32_t sz = taosArrayGetSize(pReq->topics); | ||||
|   if (tEncodeI32(&encoder, sz) < 0) return -1; | ||||
|   for (int32_t i = 0; i < sz; ++i) { | ||||
|     TopicOffsetRows* vgs = (TopicOffsetRows*)taosArrayGet(pReq->topics, i); | ||||
|     if (tEncodeCStr(&encoder, vgs->topicName) < 0) return -1; | ||||
|     int32_t szVgs = taosArrayGetSize(vgs->offsetRows); | ||||
|     if (tEncodeI32(&encoder, szVgs) < 0) return -1; | ||||
|     for (int32_t j = 0; j < szVgs; ++j) { | ||||
|       OffsetRows *offRows = taosArrayGet(vgs->offsetRows, j); | ||||
|       if (tEncodeI32(&encoder, offRows->vgId) < 0) return -1; | ||||
|       if (tEncodeI64(&encoder, offRows->rows) < 0) return -1; | ||||
|       if (tEncodeSTqOffsetVal(&encoder, &offRows->offset) < 0) return -1; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   tEndEncode(&encoder); | ||||
| 
 | ||||
|   int32_t tlen = encoder.pos; | ||||
|  | @ -5362,7 +5386,28 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { | |||
| 
 | ||||
|   if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; | ||||
|   if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1; | ||||
| 
 | ||||
|   int32_t sz = 0; | ||||
|   if (tDecodeI32(&decoder, &sz) < 0) return -1; | ||||
|   if(sz > 0){ | ||||
|     pReq->topics = taosArrayInit(sz, sizeof(TopicOffsetRows)); | ||||
|     if (NULL == pReq->topics) return -1; | ||||
|     for (int32_t i = 0; i < sz; ++i) { | ||||
|       TopicOffsetRows* data = taosArrayReserve(pReq->topics, 1); | ||||
|       tDecodeCStrTo(&decoder, data->topicName); | ||||
|       int32_t szVgs = 0; | ||||
|       if (tDecodeI32(&decoder, &szVgs) < 0) return -1; | ||||
|       if(szVgs > 0){ | ||||
|         data->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); | ||||
|         if (NULL == data->offsetRows) return -1; | ||||
|         for (int32_t j= 0; j < szVgs; ++j) { | ||||
|           OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); | ||||
|           if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1; | ||||
|           if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1; | ||||
|           if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1; | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|   tEndDecode(&decoder); | ||||
| 
 | ||||
|   tDecoderClear(&decoder); | ||||
|  |  | |||
|  | @ -550,12 +550,18 @@ typedef struct { | |||
|   int64_t upTime; | ||||
|   int64_t subscribeTime; | ||||
|   int64_t rebalanceTime; | ||||
| 
 | ||||
|   int8_t         withTbName; | ||||
|   int8_t         useSnapshot; | ||||
|   int8_t         autoCommit; | ||||
|   int32_t        autoCommitInterval; | ||||
|   int32_t        resetOffsetCfg; | ||||
| } SMqConsumerObj; | ||||
| 
 | ||||
| SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); | ||||
| void            tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer); | ||||
| int32_t         tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); | ||||
| void*           tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer); | ||||
| void*           tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver); | ||||
| 
 | ||||
| typedef struct { | ||||
|   int32_t vgId; | ||||
|  | @ -571,12 +577,13 @@ void*    tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp); | |||
| typedef struct { | ||||
|   int64_t consumerId;  // -1 for unassigned
 | ||||
|   SArray* vgs;         // SArray<SMqVgEp*>
 | ||||
|   SArray* offsetRows;  // SArray<OffsetRows*>
 | ||||
| } SMqConsumerEp; | ||||
| 
 | ||||
| SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp); | ||||
| void           tDeleteSMqConsumerEp(void* pEp); | ||||
| //SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp);
 | ||||
| //void           tDeleteSMqConsumerEp(void* pEp);
 | ||||
| int32_t        tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp); | ||||
| void*          tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp); | ||||
| void*          tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp, int8_t sver); | ||||
| 
 | ||||
| typedef struct { | ||||
|   char      key[TSDB_SUBSCRIBE_KEY_LEN]; | ||||
|  | @ -595,7 +602,7 @@ SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]); | |||
| SMqSubscribeObj* tCloneSubscribeObj(const SMqSubscribeObj* pSub); | ||||
| void             tDeleteSubscribeObj(SMqSubscribeObj* pSub); | ||||
| int32_t          tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub); | ||||
| void*            tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub); | ||||
| void*            tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver); | ||||
| 
 | ||||
| typedef struct { | ||||
|   int32_t epoch; | ||||
|  |  | |||
|  | @ -23,7 +23,7 @@ | |||
| #include "tcompare.h" | ||||
| #include "tname.h" | ||||
| 
 | ||||
| #define MND_CONSUMER_VER_NUMBER   1 | ||||
| #define MND_CONSUMER_VER_NUMBER   2 | ||||
| #define MND_CONSUMER_RESERVE_SIZE 64 | ||||
| 
 | ||||
| #define MND_CONSUMER_LOST_HB_CNT          6 | ||||
|  | @ -391,12 +391,13 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { | |||
| } | ||||
| 
 | ||||
| static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { | ||||
|   int32_t code = 0; | ||||
|   SMnode  *pMnode = pMsg->info.node; | ||||
|   SMqHbReq req = {0}; | ||||
| 
 | ||||
|   if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { | ||||
|   if ((code = tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req)) < 0) { | ||||
|     terrno = TSDB_CODE_OUT_OF_MEMORY; | ||||
|     return -1; | ||||
|     goto end; | ||||
|   } | ||||
| 
 | ||||
|   int64_t         consumerId = req.consumerId; | ||||
|  | @ -404,7 +405,8 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { | |||
|   if (pConsumer == NULL) { | ||||
|     mError("consumer:0x%" PRIx64 " not exist", consumerId); | ||||
|     terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; | ||||
|     return -1; | ||||
|     code = -1; | ||||
|     goto end; | ||||
|   } | ||||
| 
 | ||||
|   atomic_store_32(&pConsumer->hbStatus, 0); | ||||
|  | @ -424,9 +426,25 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { | |||
|     tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); | ||||
|   } | ||||
| 
 | ||||
|   for(int i = 0; i < taosArrayGetSize(req.topics); i++){ | ||||
|     TopicOffsetRows* data = taosArrayGet(req.topics, i); | ||||
|     mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); | ||||
| 
 | ||||
|     SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); | ||||
|     taosRLockLatch(&pSub->lock); | ||||
|     SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); | ||||
|     pConsumerEp->offsetRows = data->offsetRows; | ||||
|     data->offsetRows = NULL; | ||||
|     taosRUnLockLatch(&pSub->lock); | ||||
| 
 | ||||
|     mndReleaseSubscribe(pMnode, pSub); | ||||
|   } | ||||
| 
 | ||||
|   mndReleaseConsumer(pMnode, pConsumer); | ||||
| 
 | ||||
|   return 0; | ||||
| end: | ||||
|   tDeatroySMqHbReq(&req); | ||||
|   return code; | ||||
| } | ||||
| 
 | ||||
| static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { | ||||
|  | @ -675,6 +693,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { | |||
|     pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); | ||||
|     tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); | ||||
| 
 | ||||
|     pConsumerNew->withTbName = subscribe.withTbName; | ||||
|     pConsumerNew->useSnapshot = subscribe.useSnapshot; | ||||
|     pConsumerNew->autoCommit = subscribe.autoCommit; | ||||
|     pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval; | ||||
|     pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg; | ||||
| 
 | ||||
|     // set the update type
 | ||||
|     pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE; | ||||
|     taosArrayDestroy(pConsumerNew->assignedTopics); | ||||
|  | @ -822,7 +846,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { | |||
|     goto CM_DECODE_OVER; | ||||
|   } | ||||
| 
 | ||||
|   if (sver != MND_CONSUMER_VER_NUMBER) { | ||||
|   if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) { | ||||
|     terrno = TSDB_CODE_SDB_INVALID_DATA_VER; | ||||
|     goto CM_DECODE_OVER; | ||||
|   } | ||||
|  | @ -849,7 +873,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { | |||
|   SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER); | ||||
|   SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER); | ||||
| 
 | ||||
|   if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) { | ||||
|   if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) { | ||||
|     terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
 | ||||
|     goto CM_DECODE_OVER; | ||||
|   } | ||||
|  | @ -1159,6 +1183,21 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * | |||
|       pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|       colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0); | ||||
| 
 | ||||
|       pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|       colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->withTbName, false); | ||||
| 
 | ||||
|       pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|       colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->useSnapshot, false); | ||||
| 
 | ||||
|       pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|       colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommit, false); | ||||
| 
 | ||||
|       pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|       colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommitInterval, false); | ||||
| 
 | ||||
|       pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|       colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->resetOffsetCfg, false); | ||||
| 
 | ||||
|       numOfRows++; | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
|  | @ -321,10 +321,15 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { | |||
|     tlen += taosEncodeFixedI32(buf, 0); | ||||
|   } | ||||
| 
 | ||||
|   tlen += taosEncodeFixedI8(buf, pConsumer->withTbName); | ||||
|   tlen += taosEncodeFixedI8(buf, pConsumer->useSnapshot); | ||||
|   tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit); | ||||
|   tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval); | ||||
|   tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); | ||||
|   return tlen; | ||||
| } | ||||
| 
 | ||||
| void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) { | ||||
| void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) { | ||||
|   int32_t sz; | ||||
|   buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); | ||||
|   buf = taosDecodeStringTo(buf, pConsumer->clientId); | ||||
|  | @ -375,50 +380,96 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) { | |||
|     taosArrayPush(pConsumer->assignedTopics, &topic); | ||||
|   } | ||||
| 
 | ||||
|   if(sver > 1){ | ||||
|     buf = taosDecodeFixedI8(buf, &pConsumer->withTbName); | ||||
|     buf = taosDecodeFixedI8(buf, &pConsumer->useSnapshot); | ||||
|     buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit); | ||||
|     buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); | ||||
|     buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg); | ||||
|   } | ||||
| 
 | ||||
|   return (void *)buf; | ||||
| } | ||||
| 
 | ||||
| SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { | ||||
|   SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); | ||||
|   if (pConsumerEpNew == NULL) return NULL; | ||||
|   pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; | ||||
|   pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp); | ||||
|   return pConsumerEpNew; | ||||
| } | ||||
| 
 | ||||
| void tDeleteSMqConsumerEp(void *data) { | ||||
|   SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; | ||||
|   taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); | ||||
| } | ||||
| //SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
 | ||||
| //  SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
 | ||||
| //  if (pConsumerEpNew == NULL) return NULL;
 | ||||
| //  pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
 | ||||
| //  pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
 | ||||
| //  return pConsumerEpNew;
 | ||||
| //}
 | ||||
| //
 | ||||
| //void tDeleteSMqConsumerEp(void *data) {
 | ||||
| //  SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
 | ||||
| //  taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
 | ||||
| //}
 | ||||
| 
 | ||||
| int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { | ||||
|   int32_t tlen = 0; | ||||
|   tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); | ||||
|   tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp); | ||||
| #if 0 | ||||
|   int32_t sz = taosArrayGetSize(pConsumerEp->vgs); | ||||
|   tlen += taosEncodeFixedI32(buf, sz); | ||||
|   for (int32_t i = 0; i < sz; i++) { | ||||
|     SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); | ||||
|     tlen += tEncodeSMqVgEp(buf, pVgEp); | ||||
|   int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows); | ||||
|   tlen += taosEncodeFixedI32(buf, szVgs); | ||||
|   for (int32_t j= 0; j < szVgs; ++j) { | ||||
|     OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j); | ||||
|     tlen += taosEncodeFixedI32(buf, offRows->vgId); | ||||
|     tlen += taosEncodeFixedI64(buf, offRows->rows); | ||||
|     tlen += taosEncodeFixedI8(buf, offRows->offset.type); | ||||
|     if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { | ||||
|       tlen += taosEncodeFixedI64(buf, offRows->offset.uid); | ||||
|       tlen += taosEncodeFixedI64(buf, offRows->offset.ts); | ||||
|     } else if (offRows->offset.type == TMQ_OFFSET__LOG) { | ||||
|       tlen += taosEncodeFixedI64(buf, offRows->offset.version); | ||||
|     } else { | ||||
|       // do nothing
 | ||||
|     } | ||||
|   } | ||||
| #endif | ||||
| //#if 0
 | ||||
| //  int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
 | ||||
| //  tlen += taosEncodeFixedI32(buf, sz);
 | ||||
| //  for (int32_t i = 0; i < sz; i++) {
 | ||||
| //    SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
 | ||||
| //    tlen += tEncodeSMqVgEp(buf, pVgEp);
 | ||||
| //  }
 | ||||
| //#endif
 | ||||
|   return tlen; | ||||
| } | ||||
| 
 | ||||
| void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) { | ||||
| void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) { | ||||
|   buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); | ||||
|   buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp)); | ||||
| #if 0 | ||||
|   int32_t sz; | ||||
|   buf = taosDecodeFixedI32(buf, &sz); | ||||
|   pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *)); | ||||
|   for (int32_t i = 0; i < sz; i++) { | ||||
|     SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); | ||||
|     buf = tDecodeSMqVgEp(buf, pVgEp); | ||||
|     taosArrayPush(pConsumerEp->vgs, &pVgEp); | ||||
|   if (sver > 1){ | ||||
|     int32_t szVgs = 0; | ||||
|     buf = taosDecodeFixedI32(buf, &szVgs); | ||||
|     if(szVgs > 0){ | ||||
|       pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); | ||||
|       if (NULL == pConsumerEp->offsetRows) return NULL; | ||||
|       for (int32_t j= 0; j < szVgs; ++j) { | ||||
|         OffsetRows* offRows = taosArrayReserve(pConsumerEp->offsetRows, 1); | ||||
|         buf = taosDecodeFixedI32(buf, &offRows->vgId); | ||||
|         buf = taosDecodeFixedI64(buf, &offRows->rows); | ||||
|         buf = taosDecodeFixedI8(buf, &offRows->offset.type); | ||||
|         if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { | ||||
|           buf = taosDecodeFixedI64(buf, &offRows->offset.uid); | ||||
|           buf = taosDecodeFixedI64(buf, &offRows->offset.ts); | ||||
|         } else if (offRows->offset.type == TMQ_OFFSET__LOG) { | ||||
|           buf = taosDecodeFixedI64(buf, &offRows->offset.version); | ||||
|         } else { | ||||
|           // do nothing
 | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|   } | ||||
| #endif | ||||
| //#if 0
 | ||||
| //  int32_t sz;
 | ||||
| //  buf = taosDecodeFixedI32(buf, &sz);
 | ||||
| //  pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
 | ||||
| //  for (int32_t i = 0; i < sz; i++) {
 | ||||
| //    SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
 | ||||
| //    buf = tDecodeSMqVgEp(buf, pVgEp);
 | ||||
| //    taosArrayPush(pConsumerEp->vgs, &pVgEp);
 | ||||
| //  }
 | ||||
| //#endif
 | ||||
| 
 | ||||
|   return (void *)buf; | ||||
| } | ||||
|  | @ -479,6 +530,7 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { | |||
|     if (pIter == NULL) break; | ||||
|     SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; | ||||
|     taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); | ||||
|     taosArrayDestroy(pConsumerEp->offsetRows); | ||||
|   } | ||||
|   taosHashCleanup(pSub->consumerHash); | ||||
|   taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp); | ||||
|  | @ -511,7 +563,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { | |||
|   return tlen; | ||||
| } | ||||
| 
 | ||||
| void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { | ||||
| void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { | ||||
|   //
 | ||||
|   buf = taosDecodeStringTo(buf, pSub->key); | ||||
|   buf = taosDecodeFixedI64(buf, &pSub->dbUid); | ||||
|  | @ -526,7 +578,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { | |||
|   pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); | ||||
|   for (int32_t i = 0; i < sz; i++) { | ||||
|     SMqConsumerEp consumerEp = {0}; | ||||
|     buf = tDecodeSMqConsumerEp(buf, &consumerEp); | ||||
|     buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver); | ||||
|     taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)); | ||||
|   } | ||||
| 
 | ||||
|  | @ -535,65 +587,65 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { | |||
|   return (void *)buf; | ||||
| } | ||||
| 
 | ||||
| SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { | ||||
|   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry)); | ||||
|   if (pEntryNew == NULL) return NULL; | ||||
|   pEntryNew->epoch = pEntry->epoch; | ||||
|   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp); | ||||
|   return pEntryNew; | ||||
| } | ||||
| //SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
 | ||||
| //  SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
 | ||||
| //  if (pEntryNew == NULL) return NULL;
 | ||||
| //  pEntryNew->epoch = pEntry->epoch;
 | ||||
| //  pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
 | ||||
| //  return pEntryNew;
 | ||||
| //}
 | ||||
| //
 | ||||
| //void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
 | ||||
| //  taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
 | ||||
| //}
 | ||||
| 
 | ||||
| void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { | ||||
|   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp); | ||||
| } | ||||
| //int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
 | ||||
| //  int32_t tlen = 0;
 | ||||
| //  tlen += taosEncodeFixedI32(buf, pEntry->epoch);
 | ||||
| //  tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
 | ||||
| //  return tlen;
 | ||||
| //}
 | ||||
| //
 | ||||
| //void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
 | ||||
| //  buf = taosDecodeFixedI32(buf, &pEntry->epoch);
 | ||||
| //  buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
 | ||||
| //  return (void *)buf;
 | ||||
| //}
 | ||||
| 
 | ||||
| int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) { | ||||
|   int32_t tlen = 0; | ||||
|   tlen += taosEncodeFixedI32(buf, pEntry->epoch); | ||||
|   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry); | ||||
|   return tlen; | ||||
| } | ||||
| //SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
 | ||||
| //  SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
 | ||||
| //  if (pLogNew == NULL) return pLogNew;
 | ||||
| //  memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
 | ||||
| //  pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
 | ||||
| //  return pLogNew;
 | ||||
| //}
 | ||||
| //
 | ||||
| //void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
 | ||||
| //  taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
 | ||||
| //}
 | ||||
| 
 | ||||
| void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) { | ||||
|   buf = taosDecodeFixedI32(buf, &pEntry->epoch); | ||||
|   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); | ||||
|   return (void *)buf; | ||||
| } | ||||
| 
 | ||||
| SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) { | ||||
|   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj)); | ||||
|   if (pLogNew == NULL) return pLogNew; | ||||
|   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN); | ||||
|   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp); | ||||
|   return pLogNew; | ||||
| } | ||||
| 
 | ||||
| void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) { | ||||
|   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp); | ||||
| } | ||||
| 
 | ||||
| int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) { | ||||
|   int32_t tlen = 0; | ||||
|   tlen += taosEncodeString(buf, pLog->key); | ||||
|   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry); | ||||
|   return tlen; | ||||
| } | ||||
| 
 | ||||
| void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) { | ||||
|   buf = taosDecodeStringTo(buf, pLog->key); | ||||
|   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); | ||||
|   return (void *)buf; | ||||
| } | ||||
| 
 | ||||
| int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) { | ||||
|   int32_t tlen = 0; | ||||
|   tlen += taosEncodeString(buf, pOffset->key); | ||||
|   tlen += taosEncodeFixedI64(buf, pOffset->offset); | ||||
|   return tlen; | ||||
| } | ||||
| 
 | ||||
| void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) { | ||||
|   buf = taosDecodeStringTo(buf, pOffset->key); | ||||
|   buf = taosDecodeFixedI64(buf, &pOffset->offset); | ||||
|   return buf; | ||||
| } | ||||
| //int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
 | ||||
| //  int32_t tlen = 0;
 | ||||
| //  tlen += taosEncodeString(buf, pLog->key);
 | ||||
| //  tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
 | ||||
| //  return tlen;
 | ||||
| //}
 | ||||
| //
 | ||||
| //void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
 | ||||
| //  buf = taosDecodeStringTo(buf, pLog->key);
 | ||||
| //  buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
 | ||||
| //  return (void *)buf;
 | ||||
| //}
 | ||||
| //
 | ||||
| //int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
 | ||||
| //  int32_t tlen = 0;
 | ||||
| //  tlen += taosEncodeString(buf, pOffset->key);
 | ||||
| //  tlen += taosEncodeFixedI64(buf, pOffset->offset);
 | ||||
| //  return tlen;
 | ||||
| //}
 | ||||
| //
 | ||||
| //void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
 | ||||
| //  buf = taosDecodeStringTo(buf, pOffset->key);
 | ||||
| //  buf = taosDecodeFixedI64(buf, &pOffset->offset);
 | ||||
| //  return buf;
 | ||||
| //}
 | ||||
|  |  | |||
|  | @ -24,7 +24,7 @@ | |||
| #include "tcompare.h" | ||||
| #include "tname.h" | ||||
| 
 | ||||
| #define MND_SUBSCRIBE_VER_NUMBER   1 | ||||
| #define MND_SUBSCRIBE_VER_NUMBER   2 | ||||
| #define MND_SUBSCRIBE_RESERVE_SIZE 64 | ||||
| 
 | ||||
| #define MND_SUBSCRIBE_REBALANCE_CNT 3 | ||||
|  | @ -809,7 +809,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { | |||
|   int8_t sver = 0; | ||||
|   if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER; | ||||
| 
 | ||||
|   if (sver != MND_SUBSCRIBE_VER_NUMBER) { | ||||
|   if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) { | ||||
|     terrno = TSDB_CODE_SDB_INVALID_DATA_VER; | ||||
|     goto SUB_DECODE_OVER; | ||||
|   } | ||||
|  | @ -828,7 +828,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { | |||
|   SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); | ||||
|   SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); | ||||
| 
 | ||||
|   if (tDecodeSubscribeObj(buf, pSub) == NULL) { | ||||
|   if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) { | ||||
|     goto SUB_DECODE_OVER; | ||||
|   } | ||||
| 
 | ||||
|  | @ -1087,15 +1087,39 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock | |||
|                pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId); | ||||
| 
 | ||||
|         // offset
 | ||||
| #if 0 | ||||
|       // subscribe time
 | ||||
|       pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|       colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); | ||||
| 
 | ||||
|       // rebalance time
 | ||||
|       pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|       colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); | ||||
| #endif | ||||
|         OffsetRows *data = NULL; | ||||
|         for(int i = 0; i < taosArrayGetSize(pConsumerEp->offsetRows); i++){ | ||||
|           OffsetRows *tmp = taosArrayGet(pConsumerEp->offsetRows, i); | ||||
|           if(data->vgId != pVgEp->vgId){ | ||||
|             continue; | ||||
|           } | ||||
|           data = tmp; | ||||
|         } | ||||
|         if(data){ | ||||
|           // vg id
 | ||||
|           char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; | ||||
|           tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset); | ||||
|           varDataSetLen(buf, strlen(varDataVal(buf))); | ||||
|           pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|           colDataSetVal(pColInfo, numOfRows, (const char *)buf, false); | ||||
|           pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|           colDataSetVal(pColInfo, numOfRows, (const char *)&data->rows, false); | ||||
|         }else{ | ||||
|           pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|           colDataSetNULL(pColInfo, numOfRows); | ||||
|           pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); | ||||
|           colDataSetNULL(pColInfo, numOfRows); | ||||
|           mError("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId); | ||||
|         } | ||||
| //#if 0
 | ||||
| //      // subscribe time
 | ||||
| //      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
 | ||||
| //      colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
 | ||||
| //
 | ||||
| //      // rebalance time
 | ||||
| //      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
 | ||||
| //      colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
 | ||||
| //#endif
 | ||||
| 
 | ||||
|         numOfRows++; | ||||
|       } | ||||
|  |  | |||
|  | @ -243,8 +243,8 @@ int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { | |||
|   tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, | ||||
|                   ever); | ||||
| 
 | ||||
|   char buf1[80] = {0}; | ||||
|   char buf2[80] = {0}; | ||||
|   char buf1[TSDB_OFFSET_LEN] = {0}; | ||||
|   char buf2[TSDB_OFFSET_LEN] = {0}; | ||||
|   tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); | ||||
|   tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); | ||||
|   tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, | ||||
|  | @ -259,10 +259,10 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* | |||
| 
 | ||||
|   tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever); | ||||
| 
 | ||||
|   char buf1[80] = {0}; | ||||
|   char buf2[80] = {0}; | ||||
|   tFormatOffset(buf1, 80, &pRsp->reqOffset); | ||||
|   tFormatOffset(buf2, 80, &pRsp->rspOffset); | ||||
|   char buf1[TSDB_OFFSET_LEN] = {0}; | ||||
|   char buf2[TSDB_OFFSET_LEN] = {0}; | ||||
|   tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset); | ||||
|   tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset); | ||||
| 
 | ||||
|   tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId, | ||||
|           pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); | ||||
|  | @ -481,8 +481,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { | |||
|     pHandle->epoch = reqEpoch; | ||||
|   } | ||||
| 
 | ||||
|   char buf[80]; | ||||
|   tFormatOffset(buf, 80, &reqOffset); | ||||
|   char buf[TSDB_OFFSET_LEN]; | ||||
|   tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); | ||||
|   tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, | ||||
|           consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); | ||||
| 
 | ||||
|  |  | |||
|  | @ -99,8 +99,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand | |||
|   if (pOffset != NULL) { | ||||
|     *pOffsetVal = pOffset->val; | ||||
| 
 | ||||
|     char formatBuf[80]; | ||||
|     tFormatOffset(formatBuf, 80, pOffsetVal); | ||||
|     char formatBuf[TSDB_OFFSET_LEN]; | ||||
|     tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal); | ||||
|     tqDebug("tmq poll: consumer:0x%" PRIx64 | ||||
|             ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64, | ||||
|             consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId); | ||||
|  | @ -186,8 +186,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, | |||
|   code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); | ||||
| 
 | ||||
| end : { | ||||
|   char buf[80] = {0}; | ||||
|   tFormatOffset(buf, 80, &dataRsp.rspOffset); | ||||
|   char buf[TSDB_OFFSET_LEN] = {0}; | ||||
|   tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset); | ||||
|   tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 | ||||
|           " code:%d", | ||||
|           consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue