Merge pull request #9968 from taosdata/feature/tq

consume skip ununsed table
This commit is contained in:
Liu Jicong 2022-01-21 23:29:38 +08:00 committed by GitHub
commit e9fd92ff68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 23 deletions

View File

@ -1579,32 +1579,47 @@ typedef struct SMqSetCVgRsp {
char cGroup[TSDB_CONSUMER_GROUP_LEN]; char cGroup[TSDB_CONSUMER_GROUP_LEN];
} SMqSetCVgRsp; } SMqSetCVgRsp;
typedef struct SMqCVConsumeReq { typedef struct SMqConsumeReq {
int64_t reqId; int64_t reqId;
int64_t offset; int64_t offset;
int64_t consumerId; int64_t consumerId;
int64_t blockingTime; int64_t blockingTime;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqCVConsumeReq; } SMqConsumeReq;
typedef struct SMqConsumeRspBlock { typedef struct SMqColData {
int32_t bodyLen; int16_t colId;
char topicName[TSDB_TOPIC_FNAME_LEN]; int16_t type;
char body[]; int16_t bytes;
} SMqConsumeRspBlock; char data[];
} SMqColData;
typedef struct SMqCVConsumeRsp { typedef struct SMqTbData {
int64_t reqId; int64_t uid;
int64_t clientId; int32_t numOfCols;
int64_t committedOffset; int32_t numOfRows;
int64_t receiveOffset; SMqColData colData[];
int64_t rspOffset; } SMqTbData;
int32_t skipLogNum;
int32_t bodyLen; typedef struct SMqTopicBlk {
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
SMqConsumeRspBlock blocks[]; int64_t committedOffset;
} SMqCvConsumeRsp; int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t bodyLen;
int32_t numOfTb;
SMqTbData tbData[];
} SMqTopicData;
typedef struct SMqConsumeRsp {
int64_t reqId;
int64_t clientId;
int32_t bodyLen;
int32_t numOfTopics;
SMqTopicData data[];
} SMqConsumeRsp;
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -117,7 +117,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet; p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p); /*p->pAppHbMgr = appHbMgrInit(p);*/
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p; pInst = &p;

View File

@ -72,7 +72,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY}; SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL); /*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/
// pRequest->body.resInfo.pRspMsg = pMsg->pData; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,

View File

@ -68,7 +68,7 @@ typedef struct {
typedef struct STqReadHandle { typedef struct STqReadHandle {
int64_t ver; int64_t ver;
int64_t tbUid; uint64_t tbUid;
SSubmitMsg* pMsg; SSubmitMsg* pMsg;
SSubmitBlk* pBlock; SSubmitBlk* pBlock;
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
@ -204,7 +204,7 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SA
pReadHandle->pColIdList = pColIdList; pReadHandle->pColIdList = pColIdList;
} }
static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) { static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t tbUid) {
pHandle->tbUid = tbUid; pHandle->tbUid = tbUid;
} }

View File

@ -610,7 +610,7 @@ int tqItemSSize() {
} }
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
SMqCVConsumeReq* pReq = pMsg->pCont; SMqConsumeReq* pReq = pMsg->pCont;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
int64_t reqId = pReq->reqId; int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;