Merge pull request #9924 from taosdata/feature/tq

add tq process consumer msg
This commit is contained in:
Liu Jicong 2022-01-20 15:34:28 +08:00 committed by GitHub
commit b899c60a9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 155 additions and 110 deletions

View File

@ -1527,7 +1527,6 @@ typedef struct SMqSetCVgReq {
SArray* tasks; // SArray<SSubQueryMsg> SArray* tasks; // SArray<SSubQueryMsg>
} SMqSetCVgReq; } SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI32(buf, pReq->vgId);
@ -1552,14 +1551,28 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
return buf; return buf;
} }
typedef struct SMqSetCVgRsp {
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_CONSUMER_GROUP_LEN];
} SMqSetCVgRsp;
typedef struct SMqCVConsumeReq { typedef struct SMqCVConsumeReq {
int64_t reqId; int64_t reqId;
int64_t offset; int64_t offset;
int64_t clientId; int64_t consumerId;
int64_t blockingTime;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqCVConsumeReq; } SMqCVConsumeReq;
typedef struct SMqConsumeRspBlock {
int32_t bodyLen;
char topicName[TSDB_TOPIC_FNAME_LEN];
char body[];
} SMqConsumeRspBlock;
typedef struct SMqCVConsumeRsp { typedef struct SMqCVConsumeRsp {
int64_t reqId; int64_t reqId;
int64_t clientId; int64_t clientId;
@ -1569,7 +1582,7 @@ typedef struct SMqCVConsumeRsp {
int32_t skipLogNum; int32_t skipLogNum;
int32_t bodyLen; int32_t bodyLen;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char body[]; SMqConsumeRspBlock blocks[];
} SMqCvConsumeRsp; } SMqCvConsumeRsp;
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -160,7 +160,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)

View File

@ -80,8 +80,8 @@ typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2 } EWal
typedef struct SWalReadHead { typedef struct SWalReadHead {
int8_t headVer; int8_t headVer;
uint8_t msgType; int16_t msgType;
int8_t reserved[2]; int8_t reserved;
int32_t len; int32_t len;
int64_t ingestTs; // not implemented int64_t ingestTs; // not implemented
int64_t version; int64_t version;

View File

@ -55,8 +55,6 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndSubActionDelete}; .deleteFp = (SdbDeleteFp)mndSubActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
@ -95,7 +93,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
pCEp->consumerId = consumerId; pCEp->consumerId = consumerId;
taosArrayPush(pSub->assigned, pCEp); taosArrayPush(pSub->assigned, pCEp);
pSub->nextConsumerIdx++; pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
// build msg // build msg
SMqSetCVgReq req = { SMqSetCVgReq req = {

View File

@ -27,6 +27,7 @@
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "wal.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -161,7 +162,8 @@ typedef struct STqGroup {
} STqGroup; } STqGroup;
typedef struct STqTaskItem { typedef struct STqTaskItem {
int32_t status; int8_t status;
int64_t offset;
void* dst; void* dst;
SSubQueryMsg* pMsg; SSubQueryMsg* pMsg;
} STqTaskItem; } STqTaskItem;
@ -173,8 +175,7 @@ typedef struct STqBuffer {
STqTaskItem output[TQ_BUFFER_SIZE]; STqTaskItem output[TQ_BUFFER_SIZE];
} STqBuffer; } STqBuffer;
typedef struct STqClientHandle { typedef struct STqTopicHandle {
int64_t clientId;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_TOPIC_FNAME_LEN]; char cGroup[TSDB_TOPIC_FNAME_LEN];
char* sql; char* sql;
@ -183,24 +184,20 @@ typedef struct STqClientHandle {
int64_t committedOffset; int64_t committedOffset;
int64_t currentOffset; int64_t currentOffset;
STqBuffer buffer; STqBuffer buffer;
} STqClientHandle; SWalReadHandle* pReadhandle;
} STqTopicHandle;
typedef struct STqConsumerHandle {
int64_t consumerId;
int64_t epoch;
SArray* topics; // SArray<STqClientTopic>
} STqConsumerHandle;
typedef struct STqQueryMsg { typedef struct STqQueryMsg {
STqMsgItem* item; STqMsgItem* item;
struct STqQueryMsg* next; struct STqQueryMsg* next;
} STqQueryMsg; } STqQueryMsg;
typedef struct STqLogHandle {
void* logHandle;
void* (*openLogReader)(void* logHandle);
void (*closeLogReader)(void* logReader);
int32_t (*logRead)(void* logReader, void** data, int64_t ver);
int64_t (*logGetFirstVer)(void* logHandle);
int64_t (*logGetSnapshotVer)(void* logHandle);
int64_t (*logGetLastVer)(void* logHandle);
} STqLogHandle;
typedef struct STqCfg { typedef struct STqCfg {
// TODO // TODO
} STqCfg; } STqCfg;
@ -298,7 +295,6 @@ typedef struct STQ {
// the handle of meta kvstore // the handle of meta kvstore
char* path; char* path;
STqCfg* tqConfig; STqCfg* tqConfig;
STqLogHandle* tqLogHandle;
STqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
} STQ; } STQ;
@ -315,31 +311,26 @@ int tqInit();
void tqCleanUp(); void tqCleanUp();
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac); STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
void tqClose(STQ*); void tqClose(STQ*);
// void* will be replace by a msg type // void* will be replace by a msg type
int tqPushMsg(STQ*, void* msg, int64_t version); int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp);
#if 0
int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp);
int tqSetCursor(STQ*, STqSetCurReq* pMsg); int tqSetCursor(STQ*, STqSetCurReq* pMsg);
int tqBufferSetOffset(STqTopic*, int64_t offset); int tqBufferSetOffset(STqTopic*, int64_t offset);
STqTopic* tqFindTopic(STqGroup*, int64_t topicId); STqTopic* tqFindTopic(STqGroup*, int64_t topicId);
STqGroup* tqGetGroup(STQ*, int64_t clientId); STqGroup* tqGetGroup(STQ*, int64_t clientId);
STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqRegisterContext(STqGroup*, void* ahandle); int tqRegisterContext(STqGroup*, void* ahandle);
int tqSendLaunchQuery(STqMsgItem*, int64_t offset); int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
#endif
int tqSerializeGroup(const STqGroup*, STqSerializedHead**); int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
static int tqQueryExecuting(int32_t status) { return status; }
typedef struct STqReadHandle { typedef struct STqReadHandle {
int64_t ver; int64_t ver;
@ -350,9 +341,6 @@ typedef struct STqReadHandle {
SMeta* pMeta; SMeta* pMeta;
} STqReadHandle; } STqReadHandle;
typedef struct SSubmitBlkScanInfo {
} SSubmitBlkScanInfo;
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg); STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg);
bool tqNextDataBlock(STqReadHandle* pHandle); bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo); int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);

View File

@ -43,6 +43,9 @@ extern int32_t tqDebugFlag;
// delete persistent storage for meta info // delete persistent storage for meta info
// int tqDropTCGroup(STQ*, const char* topic, int cgId); // int tqDropTCGroup(STQ*, const char* topic, int cgId);
int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup);
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -37,7 +37,7 @@ const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
int tqInit() { int tqInit() {
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1); int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
if(old == 1) return 0; if (old == 1) return 0;
tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ"); tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
return 0; return 0;
@ -45,12 +45,12 @@ int tqInit() {
void tqCleanUp() { void tqCleanUp() {
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0); int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
if(old == 0) return; if (old == 0) return;
taosTmrStop(tqMgmt.timer); taosTmrStop(tqMgmt.timer);
taosTmrCleanUp(tqMgmt.timer); taosTmrCleanUp(tqMgmt.timer);
} }
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac) { STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
STQ* pTq = malloc(sizeof(STQ)); STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
@ -58,7 +58,6 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemA
} }
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->tqConfig = tqConfig; pTq->tqConfig = tqConfig;
pTq->tqLogHandle = tqLogHandle;
#if 0 #if 0
pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocatorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac); pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
@ -150,7 +149,7 @@ int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup
memset(pGroup, 0, sizeof(STqGroup)); memset(pGroup, 0, sizeof(STqGroup));
pGroup->topicList = tdListNew(sizeof(STqTopic)); pGroup->topicList = tdListNew(sizeof(STqTopic));
if(pGroup->topicList == NULL) { if (pGroup->topicList == NULL) {
free(pGroup); free(pGroup);
return -1; return -1;
} }
@ -329,7 +328,7 @@ int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) {
} }
int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
STqConsumeReq *pMsg = pReq->pCont; STqConsumeReq* pMsg = pReq->pCont;
int64_t clientId = pMsg->head.clientId; int64_t clientId = pMsg->head.clientId;
STqGroup* pGroup = tqGetGroup(pTq, clientId); STqGroup* pGroup = tqGetGroup(pTq, clientId);
if (pGroup == NULL) { if (pGroup == NULL) {
@ -343,8 +342,7 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
int numOfMsgs = 0; int numOfMsgs = 0;
int sizeLimit = 4096; int sizeLimit = 4096;
STqConsumeRsp* pCsmRsp = (*pRsp)->pCont;
STqConsumeRsp *pCsmRsp = (*pRsp)->pCont;
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit); void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
@ -358,14 +356,14 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
STqMsgContent* buffer = NULL; STqMsgContent* buffer = NULL;
SArray* pArray = taosArrayInit(0, sizeof(void*)); SArray* pArray = taosArrayInit(0, sizeof(void*));
SListNode *pn; SListNode* pn;
while((pn = tdListNext(&iter)) != NULL) { while ((pn = tdListNext(&iter)) != NULL) {
STqTopic* pTopic = *(STqTopic**)pn->data; STqTopic* pTopic = *(STqTopic**)pn->data;
int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE; int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE;
STqMsgItem* pItem = &pTopic->buffer[idx]; STqMsgItem* pItem = &pTopic->buffer[idx];
if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) { if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) {
if(pItem->status == TQ_ITEM_READY) { if (pItem->status == TQ_ITEM_READY) {
//if has data // if has data
totSize += pTopic->buffer[idx].size; totSize += pTopic->buffer[idx].size;
if (totSize > sizeLimit) { if (totSize > sizeLimit) {
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize); void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize);
@ -388,13 +386,13 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
if (totSize > sizeLimit) { if (totSize > sizeLimit) {
break; break;
} }
} else if(pItem->status == TQ_ITEM_PROCESS) { } else if (pItem->status == TQ_ITEM_PROCESS) {
//if not have data but in process // if not have data but in process
} else if(pItem->status == TQ_ITEM_EMPTY){ } else if (pItem->status == TQ_ITEM_EMPTY) {
//if not have data and not in process // if not have data and not in process
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS); int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS);
if(old != TQ_ITEM_EMPTY) { if (old != TQ_ITEM_EMPTY) {
continue; continue;
} }
pItem->offset = pTopic->floatingCursor; pItem->offset = pTopic->floatingCursor;
@ -416,22 +414,22 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
} }
// fetched a num of msgs, rpc response // fetched a num of msgs, rpc response
for(int i = 0; i < pArray->size; i++) { for (int i = 0; i < pArray->size; i++) {
STqMsgItem* pItem = taosArrayGet(pArray, i); STqMsgItem* pItem = taosArrayGet(pArray, i);
//read from wal // read from wal
void* raw = NULL; void* raw = NULL;
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/ /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset); /*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/
if(code < 0) { /*if (code < 0) {*/
//TODO: error // TODO: error
} /*}*/
//get msgType // get msgType
//if submitblk // if submitblk
pItem->executor->assign(pItem->executor->runtimeEnv, raw); pItem->executor->assign(pItem->executor->runtimeEnv, raw);
SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv); SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv);
pItem->content = content; pItem->content = content;
//if other type, send just put into buffer // if other type, send just put into buffer
/*pItem->content = raw;*/ /*pItem->content = raw;*/
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY); int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY);
@ -608,7 +606,48 @@ int tqItemSSize() {
return 0; return 0;
} }
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) { int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
SMqCVConsumeReq* pReq = pMsg->pCont;
int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId;
int64_t offset = pReq->offset;
int64_t blockingTime = pReq->blockingTime;
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
int sz = taosArrayGetSize(pConsumer->topics);
for (int i = 0 ; i < sz; i++) {
STqTopicHandle *pHandle = taosArrayGet(pConsumer->topics, i);
int8_t pos = offset % TQ_BUFFER_SIZE;
int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1);
if (old == 1) {
// do nothing
}
if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) {
// TODO
}
SWalHead* pHead = pHandle->pReadhandle->pHead;
while (pHead->head.msgType != TDMT_VND_SUBMIT) {
// read until find TDMT_VND_SUBMIT
}
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;
void* outputData;
atomic_store_8(&pHandle->buffer.output[pos].status, 1);
// put output into rsp
}
// launch query
// get result
SMqCvConsumeRsp* pRsp;
return 0;
}
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg) {
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
if (pReadHandle == NULL) { if (pReadHandle == NULL) {
return NULL; return NULL;
@ -621,7 +660,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) {
} }
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
if(tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false; return false;
} }
return true; return true;
@ -634,14 +673,14 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
pBlockInfo->numOfCols = pSchema->nCols; pBlockInfo->numOfCols = pSchema->nCols;
pBlockInfo->rows = pHandle->pBlock->numOfRows; pBlockInfo->rows = pHandle->pBlock->numOfRows;
pBlockInfo->uid = pHandle->pBlock->uid; pBlockInfo->uid = pHandle->pBlock->uid;
//TODO: filter out unused column // TODO: filter out unused column
return 0; return 0;
} }
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
int32_t sversion = pHandle->pBlock->sversion; int32_t sversion = pHandle->pBlock->sversion;
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion); STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
SArray *pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
if (pArray == NULL) { if (pArray == NULL) {
return NULL; return NULL;
} }
@ -653,7 +692,7 @@ SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
} }
for (int i = 0; i < pTschema->numOfCols; i++) { for (int i = 0; i < pTschema->numOfCols; i++) {
//TODO: filter out unused column // TODO: filter out unused column
taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId)); taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId));
} }
@ -661,16 +700,17 @@ SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
int32_t kvIdx; int32_t kvIdx;
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) { for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
//TODO: filter out unused column // TODO: filter out unused column
STColumn *pCol = schemaColAt(pTschema, i); STColumn* pCol = schemaColAt(pTschema, i);
void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
//TODO: handle varlen // TODO: handle varlen
memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes); memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
} }
} }
taosArrayPush(pArray, &colInfo); taosArrayPush(pArray, &colInfo);
return pArray; return pArray;
} }
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status) {*/ /*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t
/*return 0;*/ * status) {*/
/*return 0;*/
/*}*/ /*}*/

View File

@ -117,9 +117,9 @@ static int vnodeOpenImpl(SVnode *pVnode) {
return -1; return -1;
} }
// TODO: Open TQ // Open TQ
sprintf(dir, "%s/tq", pVnode->path); sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, vBufPoolGetMAF(pVnode)); pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;

View File

@ -58,7 +58,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg, pRsp); return vnodeGetTableMeta(pVnode, pMsg, pRsp);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return 0; return tqProcessConsume(pVnode->pTq, pMsg, pRsp);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;

View File

@ -110,18 +110,21 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
break; break;
case TDMT_VND_MQ_SET_CONN: { case TDMT_VND_MQ_SET_CONN: {
//TODO: wrap in a function
char* reqStr = ptr; char* reqStr = ptr;
SMqSetCVgReq req; SMqSetCVgReq req;
tDecodeSMqSetCVgReq(reqStr, &req); tDecodeSMqSetCVgReq(reqStr, &req);
STqClientHandle* pHandle = calloc(sizeof(STqClientHandle), 1); STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
if (pHandle == NULL) {
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
if (pTopic == NULL) {
// TODO: handle error // TODO: handle error
} }
strcpy(pHandle->topicName, req.topicName); strcpy(pTopic->topicName, req.topicName);
strcpy(pHandle->cGroup, req.cGroup); strcpy(pTopic->cGroup, req.cGroup);
strcpy(pHandle->sql, req.sql); strcpy(pTopic->sql, req.sql);
strcpy(pHandle->logicalPlan, req.logicalPlan); strcpy(pTopic->logicalPlan, req.logicalPlan);
strcpy(pHandle->physicalPlan, req.physicalPlan); strcpy(pTopic->physicalPlan, req.physicalPlan);
SArray *pArray; SArray *pArray;
//TODO: deserialize to SQueryDag //TODO: deserialize to SQueryDag
SQueryDag *pDag; SQueryDag *pDag;
@ -133,12 +136,12 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
STaskInfo *pInfo = taosArrayGet(pArray, 0); STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks; SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
pHandle->buffer.firstOffset = -1; pTopic->buffer.firstOffset = -1;
pHandle->buffer.lastOffset = -1; pTopic->buffer.lastOffset = -1;
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
pHandle->buffer.output[i].pMsg = pMsg; pTopic->buffer.output[i].pMsg = pMsg;
pHandle->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
} }
// write mq meta // write mq meta
} }