consume skip ununsed table

This commit is contained in:
Liu Jicong 2022-01-21 23:19:54 +08:00
parent 7c5301acec
commit fc99fe53de
5 changed files with 38 additions and 23 deletions

View File

@ -1579,32 +1579,47 @@ typedef struct SMqSetCVgRsp {
char cGroup[TSDB_CONSUMER_GROUP_LEN]; char cGroup[TSDB_CONSUMER_GROUP_LEN];
} SMqSetCVgRsp; } SMqSetCVgRsp;
typedef struct SMqCVConsumeReq { typedef struct SMqConsumeReq {
int64_t reqId; int64_t reqId;
int64_t offset; int64_t offset;
int64_t consumerId; int64_t consumerId;
int64_t blockingTime; int64_t blockingTime;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqCVConsumeReq; } SMqConsumeReq;
typedef struct SMqConsumeRspBlock { typedef struct SMqColData {
int32_t bodyLen; int16_t colId;
int16_t type;
int16_t bytes;
char data[];
} SMqColData;
typedef struct SMqTbData {
int64_t uid;
int32_t numOfCols;
int32_t numOfRows;
SMqColData colData[];
} SMqTbData;
typedef struct SMqTopicBlk {
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char body[];
} SMqConsumeRspBlock;
typedef struct SMqCVConsumeRsp {
int64_t reqId;
int64_t clientId;
int64_t committedOffset; int64_t committedOffset;
int64_t receiveOffset; int64_t reqOffset;
int64_t rspOffset; int64_t rspOffset;
int32_t skipLogNum; int32_t skipLogNum;
int32_t bodyLen; int32_t bodyLen;
char topicName[TSDB_TOPIC_FNAME_LEN]; int32_t numOfTb;
SMqConsumeRspBlock blocks[]; SMqTbData tbData[];
} SMqCvConsumeRsp; } SMqTopicData;
typedef struct SMqConsumeRsp {
int64_t reqId;
int64_t clientId;
int32_t bodyLen;
int32_t numOfTopics;
SMqTopicData data[];
} SMqConsumeRsp;
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -117,7 +117,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet; p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p); /*p->pAppHbMgr = appHbMgrInit(p);*/
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p; pInst = &p;

View File

@ -72,7 +72,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY}; SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL); /*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/
// pRequest->body.resInfo.pRspMsg = pMsg->pData; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,

View File

@ -68,7 +68,7 @@ typedef struct {
typedef struct STqReadHandle { typedef struct STqReadHandle {
int64_t ver; int64_t ver;
int64_t tbUid; uint64_t tbUid;
SSubmitMsg* pMsg; SSubmitMsg* pMsg;
SSubmitBlk* pBlock; SSubmitBlk* pBlock;
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
@ -204,7 +204,7 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SA
pReadHandle->pColIdList = pColIdList; pReadHandle->pColIdList = pColIdList;
} }
static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) { static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t tbUid) {
pHandle->tbUid = tbUid; pHandle->tbUid = tbUid;
} }

View File

@ -610,7 +610,7 @@ int tqItemSSize() {
} }
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
SMqCVConsumeReq* pReq = pMsg->pCont; SMqConsumeReq* pReq = pMsg->pCont;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
int64_t reqId = pReq->reqId; int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;