refine tqRead interface
This commit is contained in:
parent
5b51229577
commit
6a3d98cda4
|
@ -1520,7 +1520,7 @@ typedef struct SMqSetCVgReq {
|
|||
int32_t vgId;
|
||||
int64_t consumerId;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
|
@ -1532,7 +1532,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
|
|||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
||||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
||||
tlen += taosEncodeString(buf, pReq->topicName);
|
||||
tlen += taosEncodeString(buf, pReq->cGroup);
|
||||
tlen += taosEncodeString(buf, pReq->cgroup);
|
||||
tlen += taosEncodeString(buf, pReq->sql);
|
||||
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
||||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
||||
|
@ -1543,7 +1543,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
|||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||||
buf = taosDecodeStringTo(buf, pReq->topicName);
|
||||
buf = taosDecodeStringTo(buf, pReq->cGroup);
|
||||
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
||||
buf = taosDecodeString(buf, &pReq->sql);
|
||||
buf = taosDecodeString(buf, &pReq->logicalPlan);
|
||||
buf = taosDecodeString(buf, &pReq->physicalPlan);
|
||||
|
|
|
@ -100,7 +100,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
.vgId = pCEp->vgId,
|
||||
.consumerId = consumerId,
|
||||
};
|
||||
strcpy(req.cGroup, cgroup);
|
||||
strcpy(req.cgroup, cgroup);
|
||||
strcpy(req.topicName, topic);
|
||||
strcpy(req.sql, pTopic->sql);
|
||||
strcpy(req.logicalPlan, pTopic->logicalPlan);
|
||||
|
@ -168,7 +168,7 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
|
|||
.vgId = vgId,
|
||||
.consumerId = pConsumer->consumerId,
|
||||
};
|
||||
strcpy(req.cGroup, pConsumer->cgroup);
|
||||
strcpy(req.cgroup, pConsumer->cgroup);
|
||||
strcpy(req.topicName, pTopic->name);
|
||||
strcpy(req.sql, pTopic->sql);
|
||||
strcpy(req.logicalPlan, pTopic->logicalPlan);
|
||||
|
|
|
@ -297,7 +297,7 @@ typedef struct STQ {
|
|||
STqCfg* tqConfig;
|
||||
STqMemRef tqMemRef;
|
||||
STqMetaStore* tqMeta;
|
||||
SWal * pWal;
|
||||
SWal* pWal;
|
||||
} STQ;
|
||||
|
||||
typedef struct STqMgmt {
|
||||
|
@ -331,7 +331,8 @@ int tqRegisterContext(STqGroup*, void* ahandle);
|
|||
int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
|
||||
#endif
|
||||
|
||||
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
|
||||
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
|
||||
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq);
|
||||
|
||||
typedef struct STqReadHandle {
|
||||
int64_t ver;
|
||||
|
@ -340,13 +341,15 @@ typedef struct STqReadHandle {
|
|||
SSubmitMsgIter msgIter;
|
||||
SSubmitBlkIter blkIter;
|
||||
SMeta* pMeta;
|
||||
SArray* pColumnIdList;
|
||||
} STqReadHandle;
|
||||
|
||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg);
|
||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList);
|
||||
void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver);
|
||||
bool tqNextDataBlock(STqReadHandle* pHandle);
|
||||
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
|
||||
// return SArray<SColumnInfoData>
|
||||
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList);
|
||||
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -606,7 +606,7 @@ int tqItemSSize() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||
SMqCVConsumeReq* pReq = pMsg->pCont;
|
||||
int64_t reqId = pReq->reqId;
|
||||
int64_t consumerId = pReq->consumerId;
|
||||
|
@ -623,6 +623,7 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
|||
int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1);
|
||||
if (old == 1) {
|
||||
// do nothing
|
||||
continue;
|
||||
}
|
||||
if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) {
|
||||
// TODO
|
||||
|
@ -635,7 +636,17 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
|||
|
||||
SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;
|
||||
|
||||
// TODO: launch query and get output data
|
||||
void* outputData;
|
||||
pHandle->buffer.output[pos].dst = outputData;
|
||||
if (pHandle->buffer.firstOffset == -1
|
||||
|| pReq->offset < pHandle->buffer.firstOffset) {
|
||||
pHandle->buffer.firstOffset = pReq->offset;
|
||||
}
|
||||
if (pHandle->buffer.lastOffset == -1
|
||||
|| pReq->offset > pHandle->buffer.lastOffset) {
|
||||
pHandle->buffer.lastOffset = pReq->offset;
|
||||
}
|
||||
atomic_store_8(&pHandle->buffer.output[pos].status, 1);
|
||||
|
||||
// put output into rsp
|
||||
|
@ -647,16 +658,62 @@ int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg) {
|
||||
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
|
||||
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
|
||||
if (pConsumer == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
|
||||
if (pTopic == NULL) {
|
||||
free(pConsumer);
|
||||
return -1;
|
||||
}
|
||||
strcpy(pTopic->topicName, pReq->topicName);
|
||||
strcpy(pTopic->cgroup, pReq->cgroup);
|
||||
strcpy(pTopic->sql, pReq->sql);
|
||||
strcpy(pTopic->logicalPlan, pReq->logicalPlan);
|
||||
strcpy(pTopic->physicalPlan, pReq->physicalPlan);
|
||||
SArray *pArray;
|
||||
//TODO: deserialize to SQueryDag
|
||||
SQueryDag *pDag;
|
||||
// convert to task
|
||||
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
|
||||
// TODO: handle error
|
||||
}
|
||||
ASSERT(taosArrayGetSize(pArray) == 0);
|
||||
STaskInfo *pInfo = taosArrayGet(pArray, 0);
|
||||
SArray* pTasks;
|
||||
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
|
||||
pTopic->buffer.firstOffset = -1;
|
||||
pTopic->buffer.lastOffset = -1;
|
||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
|
||||
pTopic->buffer.output[i].pMsg = pMsg;
|
||||
pTopic->buffer.output[i].status = 0;
|
||||
}
|
||||
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
|
||||
// write mq meta
|
||||
return 0;
|
||||
}
|
||||
|
||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList) {
|
||||
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
|
||||
if (pReadHandle == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pReadHandle->pMeta = pMeta;
|
||||
pReadHandle->pMsg = NULL;
|
||||
pReadHandle->ver = -1;
|
||||
pReadHandle->pColumnIdList = pColumnIdList;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
|
||||
pReadHandle->pMsg = pMsg;
|
||||
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
|
||||
pReadHandle->ver = -1;
|
||||
return NULL;
|
||||
pReadHandle->ver = ver;
|
||||
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
|
||||
}
|
||||
|
||||
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||
|
@ -676,7 +733,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
|
|||
// TODO: filter out unused column
|
||||
return 0;
|
||||
}
|
||||
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
|
||||
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||
int32_t sversion = pHandle->pBlock->sversion;
|
||||
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
|
||||
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
|
||||
|
@ -691,11 +748,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pTschema->numOfCols; i++) {
|
||||
// TODO: filter out unused column
|
||||
taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId));
|
||||
}
|
||||
|
||||
SMemRow row;
|
||||
int32_t kvIdx;
|
||||
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
||||
|
|
|
@ -58,7 +58,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
case TDMT_VND_TABLE_META:
|
||||
return vnodeGetTableMeta(pVnode, pMsg, pRsp);
|
||||
case TDMT_VND_CONSUME:
|
||||
return tqProcessConsume(pVnode->pTq, pMsg, pRsp);
|
||||
return tqProcessConsumeReq(pVnode->pTq, pMsg, pRsp);
|
||||
default:
|
||||
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||
return TSDB_CODE_VND_APP_ERROR;
|
||||
|
|
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "vnd.h"
|
||||
#include "tq.h"
|
||||
#include "vnd.h"
|
||||
|
||||
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
switch (pMsg->msgType) {
|
||||
|
@ -34,7 +34,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
|||
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
||||
|
||||
// ser request version
|
||||
void * pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int64_t ver = pVnode->state.processed++;
|
||||
taosEncodeFixedU64(&pBuf, ver);
|
||||
|
||||
|
@ -53,7 +53,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
|||
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
SVCreateTbReq vCreateTbReq;
|
||||
SVCreateTbBatchReq vCreateTbBatchReq;
|
||||
void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
|
||||
void *ptr = vnodeMalloc(pVnode, pMsg->contLen);
|
||||
if (ptr == NULL) {
|
||||
// TODO: handle error
|
||||
}
|
||||
|
@ -110,43 +110,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
}
|
||||
break;
|
||||
case TDMT_VND_MQ_SET_CONN: {
|
||||
//TODO: wrap in a function
|
||||
char* reqStr = ptr;
|
||||
SMqSetCVgReq req;
|
||||
tDecodeSMqSetCVgReq(reqStr, &req);
|
||||
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
|
||||
|
||||
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
|
||||
if (pTopic == NULL) {
|
||||
// TODO: handle error
|
||||
tDecodeSMqSetCVgReq(ptr, &req);
|
||||
if (tqProcessSetConnReq(pVnode->pTq, &req) < 0) {
|
||||
}
|
||||
strcpy(pTopic->topicName, req.topicName);
|
||||
strcpy(pTopic->cgroup, req.cGroup);
|
||||
strcpy(pTopic->sql, req.sql);
|
||||
strcpy(pTopic->logicalPlan, req.logicalPlan);
|
||||
strcpy(pTopic->physicalPlan, req.physicalPlan);
|
||||
SArray *pArray;
|
||||
//TODO: deserialize to SQueryDag
|
||||
SQueryDag *pDag;
|
||||
// convert to task
|
||||
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
|
||||
// TODO: handle error
|
||||
}
|
||||
ASSERT(taosArrayGetSize(pArray) == 0);
|
||||
STaskInfo *pInfo = taosArrayGet(pArray, 0);
|
||||
SArray* pTasks;
|
||||
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
|
||||
pTopic->buffer.firstOffset = -1;
|
||||
pTopic->buffer.lastOffset = -1;
|
||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
|
||||
pTopic->buffer.output[i].pMsg = pMsg;
|
||||
pTopic->buffer.output[i].status = 0;
|
||||
}
|
||||
pTopic->pReadhandle = walOpenReadHandle(pVnode->pTq->pWal);
|
||||
// write mq meta
|
||||
}
|
||||
break;
|
||||
} break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue