consume skip ununsed table

This commit is contained in:
Liu Jicong 2022-01-21 19:00:09 +08:00
parent 99984adb45
commit a1c6c94c0b
4 changed files with 82 additions and 48 deletions

View File

@ -1590,8 +1590,8 @@ typedef struct SMqCVConsumeReq {
typedef struct SMqConsumeRspBlock { typedef struct SMqConsumeRspBlock {
int32_t bodyLen; int32_t bodyLen;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char body[]; char body[];
} SMqConsumeRspBlock; } SMqConsumeRspBlock;
typedef struct SMqCVConsumeRsp { typedef struct SMqCVConsumeRsp {

View File

@ -68,12 +68,13 @@ typedef struct {
typedef struct STqReadHandle { typedef struct STqReadHandle {
int64_t ver; int64_t ver;
int64_t tbUid;
SSubmitMsg* pMsg; SSubmitMsg* pMsg;
SSubmitBlk* pBlock; SSubmitBlk* pBlock;
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter; SSubmitBlkIter blkIter;
SMeta* pMeta; SMeta* pMeta;
SArray* pColumnIdList; SArray* pColIdList;
} STqReadHandle; } STqReadHandle;
/* ------------------------ SVnode ------------------------ */ /* ------------------------ SVnode ------------------------ */
@ -199,8 +200,12 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta); STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta);
static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColumnIdList) { static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) {
pReadHandle->pColumnIdList = pColumnIdList; pReadHandle->pColIdList = pColIdList;
}
static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) {
pHandle->tbUid = tbUid;
} }
void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver); void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver);

View File

@ -15,6 +15,7 @@
#include "tqInt.h" #include "tqInt.h"
#include "tqMetaStore.h" #include "tqMetaStore.h"
#include "tcompare.h"
// static // static
// read next version data // read next version data
@ -424,7 +425,7 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/ /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
/*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/ /*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/
/*if (code < 0) {*/ /*if (code < 0) {*/
// TODO: error // TODO: error
/*}*/ /*}*/
// get msgType // get msgType
// if submitblk // if submitblk
@ -610,61 +611,91 @@ int tqItemSSize() {
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
SMqCVConsumeReq* pReq = pMsg->pCont; SMqCVConsumeReq* pReq = pMsg->pCont;
SRpcMsg rpcMsg;
int64_t reqId = pReq->reqId; int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t reqOffset = pReq->offset; int64_t reqOffset = pReq->offset;
int64_t fetchOffset = reqOffset; int64_t fetchOffset = reqOffset;
int64_t blockingTime = pReq->blockingTime; int64_t blockingTime = pReq->blockingTime;
int rspLen = 0;
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
int sz = taosArrayGetSize(pConsumer->topics); int sz = taosArrayGetSize(pConsumer->topics);
for (int i = 0 ; i < sz; i++) { for (int i = 0; i < sz; i++) {
STqTopicHandle *pTopic = taosArrayGet(pConsumer->topics, i); STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
int8_t pos = fetchOffset % TQ_BUFFER_SIZE; int8_t pos;
int8_t old = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1); int8_t skip = 0;
if (old == 1) { SWalHead* pHead;
// do nothing
continue;
}
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
return -1;
}
SWalHead* pHead = pTopic->pReadhandle->pHead;
while (1) { while (1) {
// read until find TDMT_VND_SUBMIT pos = fetchOffset % TQ_BUFFER_SIZE;
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1);
return -1; if (skip == 1) {
// do nothing
break;
} }
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
// check err
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
break;
}
// read until find TDMT_VND_SUBMIT
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
break;
}
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
break;
}
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
fetchOffset++;
} }
if (skip == 1) continue;
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
void* task = pTopic->buffer.output[pos].task; qTaskInfo_t task = pTopic->buffer.output[pos].task;
qSetStreamInput(task, pCont); qSetStreamInput(task, pCont);
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
//SArray<SSDataBlock>
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
break;
}
if (pDataBlock != NULL) {
taosArrayPush(pRes, pDataBlock);
} else {
break;
}
} }
// TODO: launch query and get output data
pTopic->buffer.output[pos].dst = pDataBlock; atomic_store_8(&pTopic->buffer.output[pos].status, 0);
if (pTopic->buffer.firstOffset == -1
|| pReq->offset < pTopic->buffer.firstOffset) { if (taosArrayGetSize(pRes) == 0) {
taosArrayDestroy(pRes);
fetchOffset++;
continue;
}
pTopic->buffer.output[pos].dst = pRes;
if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) {
pTopic->buffer.firstOffset = pReq->offset; pTopic->buffer.firstOffset = pReq->offset;
} }
if (pTopic->buffer.lastOffset == -1 if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
|| pReq->offset > pTopic->buffer.lastOffset) {
pTopic->buffer.lastOffset = pReq->offset; pTopic->buffer.lastOffset = pReq->offset;
} }
atomic_store_8(&pTopic->buffer.output[pos].status, 1);
// put output into rsp // put output into rsp
} }
// launch query // launch query
// get result // get result
SMqCvConsumeRsp* pRsp;
return 0; return 0;
} }
@ -673,14 +704,14 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
if (pConsumer == NULL) { if (pConsumer == NULL) {
return -1; return -1;
} }
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
if (pTopic == NULL) { if (pTopic == NULL) {
free(pConsumer); free(pConsumer);
return -1; return -1;
} }
strcpy(pTopic->topicName, pReq->topicName); strcpy(pTopic->topicName, pReq->topicName);
strcpy(pTopic->cgroup, pReq->cgroup); strcpy(pTopic->cgroup, pReq->cgroup);
strcpy(pTopic->sql, pReq->sql); strcpy(pTopic->sql, pReq->sql);
strcpy(pTopic->logicalPlan, pReq->logicalPlan); strcpy(pTopic->logicalPlan, pReq->logicalPlan);
strcpy(pTopic->physicalPlan, pReq->physicalPlan); strcpy(pTopic->physicalPlan, pReq->physicalPlan);
@ -689,7 +720,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
pTopic->buffer.lastOffset = -1; pTopic->buffer.lastOffset = -1;
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
if (pTopic->pReadhandle == NULL) { if (pTopic->pReadhandle == NULL) {
} }
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
@ -708,7 +738,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
pReadHandle->pMeta = pMeta; pReadHandle->pMeta = pMeta;
pReadHandle->pMsg = NULL; pReadHandle->pMsg = NULL;
pReadHandle->ver = -1; pReadHandle->ver = -1;
pReadHandle->pColumnIdList = NULL; pReadHandle->pColIdList = NULL;
return NULL; return NULL;
} }
@ -720,20 +750,18 @@ void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ve
} }
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) >= 0) {
return false; if (pHandle->tbUid == pHandle->pBlock->uid) return true;
} }
return true; return false;
} }
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
SMemRow row; /*int32_t sversion = pHandle->pBlock->sversion;*/
int32_t sversion = pHandle->pBlock->sversion; /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/
SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false); pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->numOfCols = pSchema->nCols;
pBlockInfo->rows = pHandle->pBlock->numOfRows; pBlockInfo->rows = pHandle->pBlock->numOfRows;
pBlockInfo->uid = pHandle->pBlock->uid; pBlockInfo->uid = pHandle->pBlock->uid;
// TODO: filter out unused column
return 0; return 0;
} }

View File

@ -29,6 +29,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input) {
return doSetStreamBlock(pOperator->pDownstream[0], input); return doSetStreamBlock(pOperator->pDownstream[0], input);
} }
return TSDB_CODE_QRY_APP_ERROR;
} else { } else {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
tqReadHandleSetMsg(pInfo->readerHandle, input, 0); tqReadHandleSetMsg(pInfo->readerHandle, input, 0);