tq handle multiple topic
This commit is contained in:
parent
f7726ce8a4
commit
51f02913f4
|
@ -1561,7 +1561,7 @@ typedef struct SMqSetCVgRsp {
|
|||
typedef struct SMqCVConsumeReq {
|
||||
int64_t reqId;
|
||||
int64_t offset;
|
||||
int64_t clientId;
|
||||
int64_t consumerId;
|
||||
int64_t blockingTime;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
|
|
|
@ -55,8 +55,6 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
|||
.deleteFp = (SdbDeleteFp)mndSubActionDelete};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
||||
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
|
||||
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
|
@ -95,7 +93,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
|
||||
pCEp->consumerId = consumerId;
|
||||
taosArrayPush(pSub->assigned, pCEp);
|
||||
pSub->nextConsumerIdx = (pSub->nextConsumerIdx+1) % taosArrayGetSize(pSub->availConsumer);
|
||||
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
|
||||
|
||||
// build msg
|
||||
SMqSetCVgReq req = {
|
||||
|
|
|
@ -162,7 +162,7 @@ typedef struct STqGroup {
|
|||
} STqGroup;
|
||||
|
||||
typedef struct STqTaskItem {
|
||||
int8_t status;
|
||||
int8_t status;
|
||||
int64_t offset;
|
||||
void* dst;
|
||||
SSubQueryMsg* pMsg;
|
||||
|
@ -175,18 +175,22 @@ typedef struct STqBuffer {
|
|||
STqTaskItem output[TQ_BUFFER_SIZE];
|
||||
} STqBuffer;
|
||||
|
||||
typedef struct STqClientHandle {
|
||||
int64_t clientId;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cGroup[TSDB_TOPIC_FNAME_LEN];
|
||||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
int64_t committedOffset;
|
||||
int64_t currentOffset;
|
||||
STqBuffer buffer;
|
||||
typedef struct STqTopicHandle {
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cGroup[TSDB_TOPIC_FNAME_LEN];
|
||||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
int64_t committedOffset;
|
||||
int64_t currentOffset;
|
||||
STqBuffer buffer;
|
||||
SWalReadHandle* pReadhandle;
|
||||
} STqClientHandle;
|
||||
} STqTopicHandle;
|
||||
|
||||
typedef struct STqConsumerHandle {
|
||||
int64_t consumerId;
|
||||
SArray* topics; // SArray<STqClientTopic>
|
||||
} STqConsumerHandle;
|
||||
|
||||
typedef struct STqQueryMsg {
|
||||
STqMsgItem* item;
|
||||
|
@ -337,7 +341,7 @@ int tqRegisterContext(STqGroup*, void* ahandle);
|
|||
int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
|
||||
#endif
|
||||
|
||||
int32_t tqProcessConsume(STQ* pTq, SRpcMsg *pMsg, SRpcMsg **ppRsp);
|
||||
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
|
||||
|
||||
typedef struct STqReadHandle {
|
||||
int64_t ver;
|
||||
|
|
|
@ -37,7 +37,7 @@ const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
|
|||
|
||||
int tqInit() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
|
||||
if(old == 1) return 0;
|
||||
if (old == 1) return 0;
|
||||
|
||||
tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
|
||||
return 0;
|
||||
|
@ -45,7 +45,7 @@ int tqInit() {
|
|||
|
||||
void tqCleanUp() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
|
||||
if(old == 0) return;
|
||||
if (old == 0) return;
|
||||
taosTmrStop(tqMgmt.timer);
|
||||
taosTmrCleanUp(tqMgmt.timer);
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup
|
|||
memset(pGroup, 0, sizeof(STqGroup));
|
||||
|
||||
pGroup->topicList = tdListNew(sizeof(STqTopic));
|
||||
if(pGroup->topicList == NULL) {
|
||||
if (pGroup->topicList == NULL) {
|
||||
free(pGroup);
|
||||
return -1;
|
||||
}
|
||||
|
@ -190,7 +190,7 @@ static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) {
|
|||
int totSize = 0;
|
||||
int numOfMsgs = 0;
|
||||
// TODO: make it a macro
|
||||
int sizeLimit = 4 * 1024;
|
||||
int sizeLimit = 4 * 1024;
|
||||
|
||||
void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit);
|
||||
if (ptr == NULL) {
|
||||
|
@ -329,9 +329,9 @@ int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) {
|
|||
}
|
||||
|
||||
int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
||||
STqConsumeReq *pMsg = pReq->pCont;
|
||||
int64_t clientId = pMsg->head.clientId;
|
||||
STqGroup* pGroup = tqGetGroup(pTq, clientId);
|
||||
STqConsumeReq* pMsg = pReq->pCont;
|
||||
int64_t clientId = pMsg->head.clientId;
|
||||
STqGroup* pGroup = tqGetGroup(pTq, clientId);
|
||||
if (pGroup == NULL) {
|
||||
terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
|
||||
return -1;
|
||||
|
@ -343,9 +343,8 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
|||
int numOfMsgs = 0;
|
||||
int sizeLimit = 4096;
|
||||
|
||||
|
||||
STqConsumeRsp *pCsmRsp = (*pRsp)->pCont;
|
||||
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit);
|
||||
STqConsumeRsp* pCsmRsp = (*pRsp)->pCont;
|
||||
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit);
|
||||
if (ptr == NULL) {
|
||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -356,16 +355,16 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
|||
tdListInitIter(topicList, &iter, TD_LIST_FORWARD);
|
||||
|
||||
STqMsgContent* buffer = NULL;
|
||||
SArray* pArray = taosArrayInit(0, sizeof(void*));
|
||||
SArray* pArray = taosArrayInit(0, sizeof(void*));
|
||||
|
||||
SListNode *pn;
|
||||
while((pn = tdListNext(&iter)) != NULL) {
|
||||
STqTopic* pTopic = *(STqTopic**)pn->data;
|
||||
int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE;
|
||||
SListNode* pn;
|
||||
while ((pn = tdListNext(&iter)) != NULL) {
|
||||
STqTopic* pTopic = *(STqTopic**)pn->data;
|
||||
int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE;
|
||||
STqMsgItem* pItem = &pTopic->buffer[idx];
|
||||
if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) {
|
||||
if(pItem->status == TQ_ITEM_READY) {
|
||||
//if has data
|
||||
if (pItem->status == TQ_ITEM_READY) {
|
||||
// if has data
|
||||
totSize += pTopic->buffer[idx].size;
|
||||
if (totSize > sizeLimit) {
|
||||
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize);
|
||||
|
@ -388,13 +387,13 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
|||
if (totSize > sizeLimit) {
|
||||
break;
|
||||
}
|
||||
} else if(pItem->status == TQ_ITEM_PROCESS) {
|
||||
//if not have data but in process
|
||||
} else if (pItem->status == TQ_ITEM_PROCESS) {
|
||||
// if not have data but in process
|
||||
|
||||
} else if(pItem->status == TQ_ITEM_EMPTY){
|
||||
//if not have data and not in process
|
||||
} else if (pItem->status == TQ_ITEM_EMPTY) {
|
||||
// if not have data and not in process
|
||||
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS);
|
||||
if(old != TQ_ITEM_EMPTY) {
|
||||
if (old != TQ_ITEM_EMPTY) {
|
||||
continue;
|
||||
}
|
||||
pItem->offset = pTopic->floatingCursor;
|
||||
|
@ -416,22 +415,22 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
|||
}
|
||||
|
||||
// fetched a num of msgs, rpc response
|
||||
for(int i = 0; i < pArray->size; i++) {
|
||||
for (int i = 0; i < pArray->size; i++) {
|
||||
STqMsgItem* pItem = taosArrayGet(pArray, i);
|
||||
|
||||
//read from wal
|
||||
// read from wal
|
||||
void* raw = NULL;
|
||||
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
|
||||
int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);
|
||||
if(code < 0) {
|
||||
//TODO: error
|
||||
if (code < 0) {
|
||||
// TODO: error
|
||||
}
|
||||
//get msgType
|
||||
//if submitblk
|
||||
// get msgType
|
||||
// if submitblk
|
||||
pItem->executor->assign(pItem->executor->runtimeEnv, raw);
|
||||
SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv);
|
||||
pItem->content = content;
|
||||
//if other type, send just put into buffer
|
||||
// if other type, send just put into buffer
|
||||
/*pItem->content = raw;*/
|
||||
|
||||
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY);
|
||||
|
@ -608,41 +607,48 @@ int tqItemSSize() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg **ppRsp) {
|
||||
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||
SMqCVConsumeReq* pReq = pMsg->pCont;
|
||||
int64_t reqId = pReq->reqId;
|
||||
int64_t clientId = pReq->clientId;
|
||||
int64_t offset = pReq->offset;
|
||||
int64_t blockingTime = pReq->blockingTime;
|
||||
int64_t reqId = pReq->reqId;
|
||||
int64_t consumerId = pReq->consumerId;
|
||||
int64_t offset = pReq->offset;
|
||||
int64_t blockingTime = pReq->blockingTime;
|
||||
|
||||
STqClientHandle* pHandle = tqHandleGet(pTq->tqMeta, clientId);
|
||||
int8_t pos = offset % TQ_BUFFER_SIZE;
|
||||
int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1);
|
||||
if (old == 1) {
|
||||
// do nothing
|
||||
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
|
||||
int sz = taosArrayGetSize(pConsumer->topics);
|
||||
|
||||
for (int i = 0 ; i < sz; i++) {
|
||||
STqTopicHandle *pHandle = taosArrayGet(pConsumer->topics, i);
|
||||
|
||||
int8_t pos = offset % TQ_BUFFER_SIZE;
|
||||
int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1);
|
||||
if (old == 1) {
|
||||
// do nothing
|
||||
}
|
||||
if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) {
|
||||
// TODO
|
||||
}
|
||||
SWalHead* pHead = pHandle->pReadhandle->pHead;
|
||||
while (pHead->head.msgType != TDMT_VND_SUBMIT) {
|
||||
// read until find TDMT_VND_SUBMIT
|
||||
}
|
||||
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
|
||||
|
||||
SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;
|
||||
|
||||
void* outputData;
|
||||
atomic_store_8(&pHandle->buffer.output[pos].status, 1);
|
||||
|
||||
// put output into rsp
|
||||
}
|
||||
if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) {
|
||||
//TODO
|
||||
}
|
||||
SWalHead *pHead = pHandle->pReadhandle->pHead;
|
||||
while (pHead->head.msgType != TDMT_VND_SUBMIT) {
|
||||
// read until find TDMT_VND_SUBMIT
|
||||
}
|
||||
SSubmitMsg *pCont = (SSubmitMsg*)&pHead->head.body;
|
||||
|
||||
SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;
|
||||
|
||||
void* outputData;
|
||||
|
||||
atomic_store_8(&pHandle->buffer.output[pos].status, 1);
|
||||
// launch query
|
||||
// get result
|
||||
// put into
|
||||
SMqCvConsumeRsp* pRsp;
|
||||
return 0;
|
||||
}
|
||||
|
||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) {
|
||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg) {
|
||||
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
|
||||
if (pReadHandle == NULL) {
|
||||
return NULL;
|
||||
|
@ -655,39 +661,39 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) {
|
|||
}
|
||||
|
||||
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||
if(tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
||||
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
|
||||
SMemRow row;
|
||||
int32_t sversion = pHandle->pBlock->sversion;
|
||||
SMemRow row;
|
||||
int32_t sversion = pHandle->pBlock->sversion;
|
||||
SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);
|
||||
pBlockInfo->numOfCols = pSchema->nCols;
|
||||
pBlockInfo->rows = pHandle->pBlock->numOfRows;
|
||||
pBlockInfo->uid = pHandle->pBlock->uid;
|
||||
//TODO: filter out unused column
|
||||
// TODO: filter out unused column
|
||||
return 0;
|
||||
}
|
||||
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
|
||||
int32_t sversion = pHandle->pBlock->sversion;
|
||||
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
|
||||
int32_t sversion = pHandle->pBlock->sversion;
|
||||
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
|
||||
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
|
||||
SArray *pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
|
||||
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
|
||||
SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
|
||||
if (pArray == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
SColumnInfoData colInfo;
|
||||
int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
|
||||
int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
|
||||
colInfo.pData = malloc(sz);
|
||||
if (colInfo.pData == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pTschema->numOfCols; i++) {
|
||||
//TODO: filter out unused column
|
||||
// TODO: filter out unused column
|
||||
taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId));
|
||||
}
|
||||
|
||||
|
@ -695,16 +701,17 @@ SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
|
|||
int32_t kvIdx;
|
||||
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
||||
for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
|
||||
//TODO: filter out unused column
|
||||
STColumn *pCol = schemaColAt(pTschema, i);
|
||||
// TODO: filter out unused column
|
||||
STColumn* pCol = schemaColAt(pTschema, i);
|
||||
void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
|
||||
//TODO: handle varlen
|
||||
// TODO: handle varlen
|
||||
memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
|
||||
}
|
||||
}
|
||||
taosArrayPush(pArray, &colInfo);
|
||||
return pArray;
|
||||
}
|
||||
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status) {*/
|
||||
/*return 0;*/
|
||||
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t
|
||||
* status) {*/
|
||||
/*return 0;*/
|
||||
/*}*/
|
||||
|
|
Loading…
Reference in New Issue