Merge pull request #9957 from taosdata/feature/tq

fix dependency for vnode
This commit is contained in:
Liu Jicong 2022-01-21 16:37:35 +08:00 committed by GitHub
commit 95b96c3672
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 110 additions and 73 deletions

View File

@ -1529,17 +1529,22 @@ typedef struct SMqSetCVgReq {
} SMqSetCVgReq; } SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
int32_t tlen = sizeof(SSubQueryMsg) + pMsg->contentLen; int32_t tlen = 0;
if (buf == NULL) return tlen; tlen += taosEncodeFixedU64(buf, pMsg->sId);
memcpy(*buf, pMsg, tlen); tlen += taosEncodeFixedU64(buf, pMsg->queryId);
*buf = POINTER_SHIFT(*buf, tlen); tlen += taosEncodeFixedU64(buf, pMsg->taskId);
tlen += taosEncodeFixedU32(buf, pMsg->contentLen);
tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen);
return tlen; return tlen;
} }
static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
int32_t tlen = sizeof(SSubQueryMsg) + ((SSubQueryMsg*)buf)->contentLen; buf = taosDecodeFixedU64(buf, &pMsg->sId);
memcpy(pMsg, buf, tlen); buf = taosDecodeFixedU64(buf, &pMsg->queryId);
return POINTER_SHIFT(buf, tlen); buf = taosDecodeFixedU64(buf, &pMsg->taskId);
buf = taosDecodeFixedU32(buf, &pMsg->contentLen);
buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen);
return buf;
} }
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {

View File

@ -372,7 +372,7 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) {
} }
// ---- binary // ---- binary
static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int valueLen) { static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int32_t valueLen) {
int tlen = 0; int tlen = 0;
if (buf != NULL) { if (buf != NULL) {
@ -384,14 +384,19 @@ static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int valu
return tlen; return tlen;
} }
static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int valueLen) { static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int32_t valueLen) {
uint64_t size = 0;
*value = malloc((size_t)valueLen); *value = malloc((size_t)valueLen);
if (*value == NULL) return NULL; if (*value == NULL) return NULL;
memcpy(*value, buf, (size_t)size); memcpy(*value, buf, (size_t)valueLen);
return POINTER_SHIFT(buf, size); return POINTER_SHIFT(buf, valueLen);
}
static FORCE_INLINE void *taosDecodeBinaryTo(void *buf, void *value, int32_t valueLen) {
memcpy(value, buf, (size_t)valueLen);
return POINTER_SHIFT(buf, valueLen);
} }
#endif #endif

View File

@ -17,11 +17,12 @@
#define _TD_TQ_H_ #define _TD_TQ_H_
#include "common.h" #include "common.h"
#include "executor.h"
#include "vnode.h"
#include "mallocator.h" #include "mallocator.h"
#include "meta.h" #include "meta.h"
#include "os.h" #include "os.h"
#include "scheduler.h" #include "scheduler.h"
#include "executor.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlist.h" #include "tlist.h"
#include "tmsg.h" #include "tmsg.h"
@ -184,10 +185,6 @@ typedef struct STqQueryMsg {
struct STqQueryMsg* next; struct STqQueryMsg* next;
} STqQueryMsg; } STqQueryMsg;
typedef struct STqCfg {
// TODO
} STqCfg;
typedef struct STqMemRef { typedef struct STqMemRef {
SMemAllocatorFactory* pAllocatorFactory; SMemAllocatorFactory* pAllocatorFactory;
SMemAllocator* pAllocator; SMemAllocator* pAllocator;
@ -284,6 +281,7 @@ typedef struct STQ {
STqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
SWal* pWal; SWal* pWal;
SMeta* pMeta;
} STQ; } STQ;
typedef struct STqMgmt { typedef struct STqMgmt {
@ -298,7 +296,7 @@ int tqInit();
void tqCleanUp(); void tqCleanUp();
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
void tqClose(STQ*); void tqClose(STQ*);
// void* will be replace by a msg type // void* will be replace by a msg type
@ -320,23 +318,6 @@ int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq); int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq);
typedef struct STqReadHandle {
int64_t ver;
SSubmitMsg* pMsg;
SSubmitBlk* pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta* pMeta;
SArray* pColumnIdList;
} STqReadHandle;
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList);
void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
// return SArray<SColumnInfoData>
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -22,7 +22,6 @@
#include "meta.h" #include "meta.h"
#include "tarray.h" #include "tarray.h"
#include "tfs.h" #include "tfs.h"
#include "tq.h"
#include "tsdb.h" #include "tsdb.h"
#include "wal.h" #include "wal.h"
@ -35,6 +34,12 @@ typedef struct SVnode SVnode;
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef struct STqCfg {
// TODO
int32_t reserved;
} STqCfg;
typedef struct SVnodeCfg { typedef struct SVnodeCfg {
int32_t vgId; int32_t vgId;
SDnode *pDnode; SDnode *pDnode;
@ -61,6 +66,16 @@ typedef struct {
PutReqToVQueryQFp putReqToVQueryQFp; PutReqToVQueryQFp putReqToVQueryQFp;
} SVnodeOpt; } SVnodeOpt;
typedef struct STqReadHandle {
int64_t ver;
SSubmitMsg* pMsg;
SSubmitBlk* pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta* pMeta;
SArray* pColumnIdList;
} STqReadHandle;
/* ------------------------ SVnode ------------------------ */ /* ------------------------ SVnode ------------------------ */
/** /**
* @brief Initialize the vnode module * @brief Initialize the vnode module
@ -180,6 +195,21 @@ int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
/* ------------------------- TQ QUERY -------------------------- */
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta);
static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColumnIdList) {
pReadHandle->pColumnIdList = pColumnIdList;
}
void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
// return SArray<SColumnInfoData>
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -17,6 +17,7 @@
#define _TD_TQ_INT_H_ #define _TD_TQ_INT_H_
#include "tq.h" #include "tq.h"
#include "meta.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -24,6 +24,7 @@
#include "tlockfree.h" #include "tlockfree.h"
#include "tmacro.h" #include "tmacro.h"
#include "wal.h" #include "wal.h"
#include "tq.h"
#include "vnode.h" #include "vnode.h"

View File

@ -50,7 +50,7 @@ void tqCleanUp() {
taosTmrCleanUp(tqMgmt.timer); taosTmrCleanUp(tqMgmt.timer);
} }
STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
STQ* pTq = malloc(sizeof(STQ)); STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
@ -58,6 +58,8 @@ STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory
} }
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->tqConfig = tqConfig; pTq->tqConfig = tqConfig;
pTq->pWal = pWal;
pTq->pMeta = pMeta;
#if 0 #if 0
pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocatorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac); pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
@ -610,30 +612,34 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
SMqCVConsumeReq* pReq = pMsg->pCont; SMqCVConsumeReq* pReq = pMsg->pCont;
int64_t reqId = pReq->reqId; int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t offset = pReq->offset; int64_t reqOffset = pReq->offset;
int64_t fetchOffset = reqOffset;
int64_t blockingTime = pReq->blockingTime; int64_t blockingTime = pReq->blockingTime;
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
int sz = taosArrayGetSize(pConsumer->topics); int sz = taosArrayGetSize(pConsumer->topics);
for (int i = 0 ; i < sz; i++) { for (int i = 0 ; i < sz; i++) {
STqTopicHandle *pHandle = taosArrayGet(pConsumer->topics, i); STqTopicHandle *pTopic = taosArrayGet(pConsumer->topics, i);
int8_t pos = offset % TQ_BUFFER_SIZE; int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1); int8_t old = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1);
if (old == 1) { if (old == 1) {
// do nothing // do nothing
continue; continue;
} }
if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) { if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
// TODO return -1;
} }
SWalHead* pHead = pHandle->pReadhandle->pHead; SWalHead* pHead = pTopic->pReadhandle->pHead;
while (pHead->head.msgType != TDMT_VND_SUBMIT) { while (1) {
// read until find TDMT_VND_SUBMIT // read until find TDMT_VND_SUBMIT
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
return -1;
}
} }
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
void* task = pHandle->buffer.output[pos].task; void* task = pTopic->buffer.output[pos].task;
qSetStreamInput(task, pCont); qSetStreamInput(task, pCont);
SSDataBlock* pDataBlock; SSDataBlock* pDataBlock;
@ -642,16 +648,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
} }
// TODO: launch query and get output data // TODO: launch query and get output data
pHandle->buffer.output[pos].dst = pDataBlock; pTopic->buffer.output[pos].dst = pDataBlock;
if (pHandle->buffer.firstOffset == -1 if (pTopic->buffer.firstOffset == -1
|| pReq->offset < pHandle->buffer.firstOffset) { || pReq->offset < pTopic->buffer.firstOffset) {
pHandle->buffer.firstOffset = pReq->offset; pTopic->buffer.firstOffset = pReq->offset;
} }
if (pHandle->buffer.lastOffset == -1 if (pTopic->buffer.lastOffset == -1
|| pReq->offset > pHandle->buffer.lastOffset) { || pReq->offset > pTopic->buffer.lastOffset) {
pHandle->buffer.lastOffset = pReq->offset; pTopic->buffer.lastOffset = pReq->offset;
} }
atomic_store_8(&pHandle->buffer.output[pos].status, 1); atomic_store_8(&pTopic->buffer.output[pos].status, 1);
// put output into rsp // put output into rsp
} }
@ -681,16 +687,20 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
pTopic->buffer.firstOffset = -1; pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1; pTopic->buffer.lastOffset = -1;
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
if (pTopic->pReadhandle == NULL) {
}
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, NULL); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, pReadHandle);
} }
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
// write mq meta // write mq meta
return 0; return 0;
} }
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList) { STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
if (pReadHandle == NULL) { if (pReadHandle == NULL) {
return NULL; return NULL;
@ -698,7 +708,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList) {
pReadHandle->pMeta = pMeta; pReadHandle->pMeta = pMeta;
pReadHandle->pMsg = NULL; pReadHandle->pMsg = NULL;
pReadHandle->ver = -1; pReadHandle->ver = -1;
pReadHandle->pColumnIdList = pColumnIdList; pReadHandle->pColumnIdList = NULL;
return NULL; return NULL;
} }

View File

@ -127,7 +127,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open TQ // Open TQ
sprintf(dir, "%s/tq", pVnode->path); sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, pVnode->pWal, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); pVnode->pTq = tqOpen(dir, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;

View File

@ -66,10 +66,12 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle
} }
// print those info into log // print those info into log
pMsg->sId = be64toh(pMsg->sId); #if 0
pMsg->queryId = be64toh(pMsg->queryId); pMsg->sId = pMsg->sId;
pMsg->taskId = be64toh(pMsg->taskId); pMsg->queryId = pMsg->queryId;
pMsg->contentLen = ntohl(pMsg->contentLen); pMsg->taskId = pMsg->taskId;
pMsg->contentLen = pMsg->contentLen;
#endif
struct SSubplan* plan = NULL; struct SSubplan* plan = NULL;
int32_t code = qStringToSubplan(pMsg->msg, &plan); int32_t code = qStringToSubplan(pMsg->msg, &plan);

View File

@ -27,6 +27,7 @@
#include "thash.h" #include "thash.h"
#include "ttypes.h" #include "ttypes.h"
#include "query.h" #include "query.h"
#include "vnode.h"
#include "tsdb.h" #include "tsdb.h"
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
@ -5425,8 +5426,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId); taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId);
} }
// TODO set the extract column id to streamHandle // set the extract column id to streamHandle
// pColList tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList);
pInfo->readerHandle = streamReadHandle; pInfo->readerHandle = streamReadHandle;
@ -5438,6 +5439,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->exec = doStreamBlockScan; pOperator->exec = doStreamBlockScan;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
return pOperator;
} }

View File

@ -1485,11 +1485,11 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
pMsg->header.vgId = htonl(tInfo.addr.nodeId); pMsg->header.vgId = htonl(tInfo.addr.nodeId);
pMsg->sId = htobe64(schMgmt.sId); pMsg->sId = schMgmt.sId;
pMsg->queryId = htobe64(plan->id.queryId); pMsg->queryId = plan->id.queryId;
pMsg->taskId = htobe64(schGenUUID()); pMsg->taskId = schGenUUID();
pMsg->taskType = TASK_TYPE_PERSISTENT; pMsg->taskType = TASK_TYPE_PERSISTENT;
pMsg->contentLen = htonl(msgLen); pMsg->contentLen = msgLen;
memcpy(pMsg->msg, msg, msgLen); memcpy(pMsg->msg, msg, msgLen);
tInfo.msg = pMsg; tInfo.msg = pMsg;