Merge pull request #14657 from taosdata/feature/stream
refactor(stream): internal refactor
This commit is contained in:
commit
7872b16d31
|
@ -117,6 +117,20 @@ typedef struct SSDataBlock {
|
|||
SDataBlockInfo info;
|
||||
} SSDataBlock;
|
||||
|
||||
enum {
|
||||
FETCH_TYPE__DATA = 1,
|
||||
FETCH_TYPE__META,
|
||||
FETCH_TYPE__NONE,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int8_t fetchType;
|
||||
union {
|
||||
SSDataBlock data;
|
||||
void* meta;
|
||||
};
|
||||
} SFetchRet;
|
||||
|
||||
typedef struct SVarColAttr {
|
||||
int32_t* offset; // start position for each entry in the list
|
||||
uint32_t length; // used buffer size that contain the valid data
|
||||
|
|
|
@ -30,7 +30,7 @@ struct SRpcMsg;
|
|||
struct SSubplan;
|
||||
|
||||
typedef struct SReadHandle {
|
||||
void* streamReader;
|
||||
void* tqReader;
|
||||
void* meta;
|
||||
void* config;
|
||||
void* vnode;
|
||||
|
@ -38,7 +38,7 @@ typedef struct SReadHandle {
|
|||
SMsgCb* pMsgCb;
|
||||
bool initMetaReader;
|
||||
bool initTableReader;
|
||||
bool initStreamReader;
|
||||
bool initTqReader;
|
||||
} SReadHandle;
|
||||
|
||||
typedef enum {
|
||||
|
@ -52,7 +52,7 @@ typedef enum {
|
|||
* @param streamReadHandle
|
||||
* @return
|
||||
*/
|
||||
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
|
||||
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
|
||||
|
||||
/**
|
||||
* Switch the stream scan to snapshot mode
|
||||
|
@ -176,6 +176,9 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
|
|||
|
||||
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
|
||||
|
||||
void* qExtractReaderFromStreamScanner(void* scanner);
|
||||
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -235,11 +235,6 @@ typedef struct SStreamTask {
|
|||
int8_t taskStatus;
|
||||
int8_t execStatus;
|
||||
|
||||
// exec info
|
||||
int64_t enqueueVer;
|
||||
int64_t processedVer;
|
||||
int64_t checkpointVer;
|
||||
|
||||
// node info
|
||||
int32_t selfChildId;
|
||||
int32_t nodeId;
|
||||
|
|
|
@ -64,8 +64,8 @@ int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32
|
|||
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
|
||||
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
|
||||
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
|
||||
void * vnodeGetIdx(SVnode *pVnode);
|
||||
void * vnodeGetIvtIdx(SVnode *pVnode);
|
||||
void *vnodeGetIdx(SVnode *pVnode);
|
||||
void *vnodeGetIvtIdx(SVnode *pVnode);
|
||||
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
||||
|
@ -95,7 +95,7 @@ typedef struct SMetaFltParam {
|
|||
tb_uid_t suid;
|
||||
int16_t cid;
|
||||
int16_t type;
|
||||
char * val;
|
||||
char *val;
|
||||
bool reverse;
|
||||
int (*filterFunc)(void *a, void *b, int16_t type);
|
||||
|
||||
|
@ -136,8 +136,8 @@ SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdLis
|
|||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
|
||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
||||
void * tsdbGetIdx(SMeta *pMeta);
|
||||
void * tsdbGetIvtIdx(SMeta *pMeta);
|
||||
void *tsdbGetIdx(SMeta *pMeta);
|
||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||
|
||||
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t *colId, int32_t numOfCols,
|
||||
void **pReader);
|
||||
|
@ -146,19 +146,37 @@ int32_t tsdbLastrowReaderClose(void *pReader);
|
|||
|
||||
// tq
|
||||
|
||||
typedef struct STqReadHandle SStreamReader;
|
||||
typedef struct STqReader {
|
||||
int64_t ver;
|
||||
const SSubmitReq *pMsg;
|
||||
SSubmitBlk *pBlock;
|
||||
SSubmitMsgIter msgIter;
|
||||
SSubmitBlkIter blkIter;
|
||||
|
||||
SStreamReader *tqInitSubmitMsgScanner(SMeta *pMeta);
|
||||
SWalReader *pWalReader;
|
||||
|
||||
void tqReadHandleSetColIdList(SStreamReader *pReadHandle, SArray *pColIdList);
|
||||
int32_t tqReadHandleSetTbUidList(SStreamReader *pHandle, const SArray *tbUidList);
|
||||
int32_t tqReadHandleAddTbUidList(SStreamReader *pHandle, const SArray *tbUidList);
|
||||
int32_t tqReadHandleRemoveTbUidList(SStreamReader *pHandle, const SArray *tbUidList);
|
||||
SMeta *pVnodeMeta;
|
||||
SHashObj *tbIdHash;
|
||||
SArray *pColIdList; // SArray<int16_t>
|
||||
|
||||
int32_t tqReadHandleSetMsg(SStreamReader *pHandle, SSubmitReq *pMsg, int64_t ver);
|
||||
bool tqNextDataBlock(SStreamReader *pHandle);
|
||||
bool tqNextDataBlockFilterOut(SStreamReader *pHandle, SHashObj *filterOutUids);
|
||||
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, SStreamReader *pHandle);
|
||||
int32_t cachedSchemaVer;
|
||||
int64_t cachedSchemaSuid;
|
||||
SSchemaWrapper *pSchemaWrapper;
|
||||
STSchema *pSchema;
|
||||
} STqReader;
|
||||
|
||||
STqReader *tqOpenReader(SVnode *pVnode);
|
||||
void tqCloseReader(STqReader *);
|
||||
|
||||
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
|
||||
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||
|
||||
int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver);
|
||||
bool tqNextDataBlock(STqReader *pReader);
|
||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
|
||||
|
||||
// sma
|
||||
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||
|
@ -214,7 +232,7 @@ struct SMetaEntry {
|
|||
int8_t type;
|
||||
int8_t flags; // TODO: need refactor?
|
||||
tb_uid_t uid;
|
||||
char * name;
|
||||
char *name;
|
||||
union {
|
||||
struct {
|
||||
SSchemaWrapper schemaRow;
|
||||
|
@ -225,7 +243,7 @@ struct SMetaEntry {
|
|||
int64_t ctime;
|
||||
int32_t ttlDays;
|
||||
int32_t commentLen;
|
||||
char * comment;
|
||||
char *comment;
|
||||
tb_uid_t suid;
|
||||
uint8_t *pTags;
|
||||
} ctbEntry;
|
||||
|
@ -233,7 +251,7 @@ struct SMetaEntry {
|
|||
int64_t ctime;
|
||||
int32_t ttlDays;
|
||||
int32_t commentLen;
|
||||
char * comment;
|
||||
char *comment;
|
||||
int32_t ncid; // next column id
|
||||
SSchemaWrapper schemaRow;
|
||||
} ntbEntry;
|
||||
|
@ -247,17 +265,17 @@ struct SMetaEntry {
|
|||
|
||||
struct SMetaReader {
|
||||
int32_t flags;
|
||||
SMeta * pMeta;
|
||||
SMeta *pMeta;
|
||||
SDecoder coder;
|
||||
SMetaEntry me;
|
||||
void * pBuf;
|
||||
void *pBuf;
|
||||
int32_t szBuf;
|
||||
};
|
||||
|
||||
struct SMTbCursor {
|
||||
TBC * pDbc;
|
||||
void * pKey;
|
||||
void * pVal;
|
||||
TBC *pDbc;
|
||||
void *pKey;
|
||||
void *pVal;
|
||||
int32_t kLen;
|
||||
int32_t vLen;
|
||||
SMetaReader mr;
|
||||
|
|
|
@ -44,25 +44,6 @@ extern "C" {
|
|||
|
||||
typedef struct STqOffsetStore STqOffsetStore;
|
||||
|
||||
// tqRead
|
||||
|
||||
struct STqReadHandle {
|
||||
int64_t ver;
|
||||
const SSubmitReq* pMsg;
|
||||
SSubmitBlk* pBlock;
|
||||
SSubmitMsgIter msgIter;
|
||||
SSubmitBlkIter blkIter;
|
||||
|
||||
SMeta* pVnodeMeta;
|
||||
SHashObj* tbIdHash;
|
||||
SArray* pColIdList; // SArray<int16_t>
|
||||
|
||||
int32_t cachedSchemaVer;
|
||||
int64_t cachedSchemaSuid;
|
||||
SSchemaWrapper* pSchemaWrapper;
|
||||
STSchema* pSchema;
|
||||
};
|
||||
|
||||
// tqPush
|
||||
|
||||
typedef struct {
|
||||
|
@ -102,7 +83,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
int8_t subType;
|
||||
|
||||
SStreamReader* pExecReader[5];
|
||||
STqReader* pExecReader[5];
|
||||
union {
|
||||
STqExecCol execCol;
|
||||
STqExecTb execTb;
|
||||
|
@ -118,7 +99,7 @@ typedef struct {
|
|||
int32_t epoch;
|
||||
int8_t fetchMeta;
|
||||
|
||||
// reader
|
||||
// TODO remove
|
||||
SWalReader* pWalReader;
|
||||
|
||||
// push
|
||||
|
|
|
@ -327,14 +327,14 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SStreamReader *pReadHandle = tqInitSubmitMsgScanner(pMeta);
|
||||
if (!pReadHandle) {
|
||||
STqReader *pReader = tqOpenReader(pVnode);
|
||||
if (!pReader) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SReadHandle handle = {
|
||||
.streamReader = pReadHandle,
|
||||
.tqReader = pReader,
|
||||
.meta = pMeta,
|
||||
.pMsgCb = pMsgCb,
|
||||
.vnode = pVnode,
|
||||
|
@ -366,7 +366,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
tdFreeRSmaInfo(pRSmaInfo);
|
||||
taosMemoryFree(pReadHandle);
|
||||
taosMemoryFree(pReader);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
|
|
@ -447,26 +447,38 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
pHandle->fetchMeta = req.withMeta;
|
||||
|
||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
}
|
||||
/*for (int32_t i = 0; i < 5; i++) {*/
|
||||
/*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
|
||||
/*}*/
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
pHandle->execHandle.execCol.qmsg = req.qmsg;
|
||||
req.qmsg = NULL;
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
SReadHandle handle = {
|
||||
.streamReader = pHandle->execHandle.pExecReader[i],
|
||||
.tqReader = pHandle->execHandle.pExecReader[i],
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.vnode = pTq->pVnode,
|
||||
.initTableReader = true,
|
||||
.initTqReader = true,
|
||||
};
|
||||
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
||||
ASSERT(pHandle->execHandle.execCol.task[i]);
|
||||
void* scanner = NULL;
|
||||
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner);
|
||||
ASSERT(scanner);
|
||||
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner);
|
||||
ASSERT(pHandle->execHandle.pExecReader[i]);
|
||||
}
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
|
||||
}
|
||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
|
||||
}
|
||||
pHandle->execHandle.execTb.suid = req.suid;
|
||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
|
||||
|
@ -476,7 +488,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
tqDebug("vg %d, idx %d, uid: %ld", TD_VID(pTq->pVnode), i, tbUid);
|
||||
}
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
tqReadHandleSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
|
||||
tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
|
||||
}
|
||||
taosArrayDestroy(tbUidList);
|
||||
}
|
||||
|
@ -532,7 +544,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
SReadHandle handle = {
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.vnode = pTq->pVnode,
|
||||
.initStreamReader = 1,
|
||||
.initTqReader = 1,
|
||||
};
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
} else {
|
||||
|
|
|
@ -135,8 +135,8 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
|
|||
}
|
||||
} else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
pRsp->withSchema = 1;
|
||||
SStreamReader* pReader = pExec->pExecReader[workerId];
|
||||
tqReadHandleSetMsg(pReader, pReq, 0);
|
||||
STqReader* pReader = pExec->pExecReader[workerId];
|
||||
tqReaderSetDataMsg(pReader, pReq, 0);
|
||||
while (tqNextDataBlock(pReader)) {
|
||||
SSDataBlock block = {0};
|
||||
if (tqRetrieveDataBlock(&block, pReader) < 0) {
|
||||
|
@ -153,8 +153,8 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
|
|||
}
|
||||
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
||||
pRsp->withSchema = 1;
|
||||
SStreamReader* pReader = pExec->pExecReader[workerId];
|
||||
tqReadHandleSetMsg(pReader, pReq, 0);
|
||||
STqReader* pReader = pExec->pExecReader[workerId];
|
||||
tqReaderSetDataMsg(pReader, pReq, 0);
|
||||
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
|
||||
SSDataBlock block = {0};
|
||||
if (tqRetrieveDataBlock(&block, pReader) < 0) {
|
||||
|
|
|
@ -79,12 +79,12 @@ int32_t tqMetaOpen(STQ* pTq) {
|
|||
tDecodeSTqHandle(&decoder, &handle);
|
||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
|
||||
}
|
||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
SReadHandle reader = {
|
||||
.streamReader = handle.execHandle.pExecReader[i],
|
||||
.tqReader = handle.execHandle.pExecReader[i],
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
.vnode = pTq->pVnode,
|
||||
|
|
|
@ -15,6 +15,11 @@
|
|||
|
||||
#include "tq.h"
|
||||
|
||||
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset) {
|
||||
/*if ()*/
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
||||
int32_t code = 0;
|
||||
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
||||
|
@ -73,53 +78,107 @@ END:
|
|||
return code;
|
||||
}
|
||||
|
||||
SStreamReader* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
||||
SStreamReader* pReadHandle = taosMemoryMalloc(sizeof(SStreamReader));
|
||||
if (pReadHandle == NULL) {
|
||||
STqReader* tqOpenReader(SVnode* pVnode) {
|
||||
STqReader* pReader = taosMemoryMalloc(sizeof(STqReader));
|
||||
if (pReader == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pReadHandle->pVnodeMeta = pMeta;
|
||||
pReadHandle->pMsg = NULL;
|
||||
pReadHandle->ver = -1;
|
||||
pReadHandle->pColIdList = NULL;
|
||||
pReadHandle->cachedSchemaVer = 0;
|
||||
pReadHandle->cachedSchemaSuid = 0;
|
||||
pReadHandle->pSchema = NULL;
|
||||
pReadHandle->pSchemaWrapper = NULL;
|
||||
pReadHandle->tbIdHash = NULL;
|
||||
return pReadHandle;
|
||||
|
||||
// TODO open
|
||||
/*pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);*/
|
||||
|
||||
pReader->pVnodeMeta = pVnode->pMeta;
|
||||
pReader->pMsg = NULL;
|
||||
pReader->ver = -1;
|
||||
pReader->pColIdList = NULL;
|
||||
pReader->cachedSchemaVer = 0;
|
||||
pReader->cachedSchemaSuid = 0;
|
||||
pReader->pSchema = NULL;
|
||||
pReader->pSchemaWrapper = NULL;
|
||||
pReader->tbIdHash = NULL;
|
||||
return pReader;
|
||||
}
|
||||
|
||||
int32_t tqReadHandleSetMsg(SStreamReader* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
|
||||
pReadHandle->pMsg = pMsg;
|
||||
void tqCloseReader(STqReader* pReader) {
|
||||
// close wal reader
|
||||
// free cached schema
|
||||
// free hash
|
||||
taosMemoryFree(pReader);
|
||||
}
|
||||
|
||||
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
||||
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||
bool fromProcessedMsg = pReader->pMsg != NULL;
|
||||
|
||||
while (1) {
|
||||
if (!fromProcessedMsg) {
|
||||
if (walNextValidMsg(pReader->pWalReader) < 0) {
|
||||
ret->fetchType = FETCH_TYPE__NONE;
|
||||
return -1;
|
||||
}
|
||||
void* body = pReader->pWalReader->pHead->head.body;
|
||||
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
|
||||
// TODO do filter
|
||||
ret->fetchType = FETCH_TYPE__META;
|
||||
ret->meta = pReader->pWalReader->pHead->head.body;
|
||||
return 0;
|
||||
} else {
|
||||
tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
|
||||
}
|
||||
}
|
||||
|
||||
while (tqNextDataBlock(pReader)) {
|
||||
memset(&ret->data, 0, sizeof(SSDataBlock));
|
||||
int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
|
||||
if (code != 0 || ret->data.info.rows == 0) {
|
||||
if (fromProcessedMsg) {
|
||||
ret->fetchType = FETCH_TYPE__NONE;
|
||||
return 0;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ret->fetchType = FETCH_TYPE__DATA;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (fromProcessedMsg) {
|
||||
ret->fetchType = FETCH_TYPE__NONE;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tqReaderSetDataMsg(STqReader* pReader, SSubmitReq* pMsg, int64_t ver) {
|
||||
pReader->pMsg = pMsg;
|
||||
|
||||
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
|
||||
while (true) {
|
||||
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
|
||||
if (pReadHandle->pBlock == NULL) break;
|
||||
if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
|
||||
if (pReader->pBlock == NULL) break;
|
||||
}
|
||||
|
||||
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
||||
pReadHandle->ver = ver;
|
||||
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
|
||||
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
|
||||
pReader->ver = ver;
|
||||
memset(&pReader->blkIter, 0, sizeof(SSubmitBlkIter));
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool tqNextDataBlock(SStreamReader* pHandle) {
|
||||
if (pHandle->pMsg == NULL) return false;
|
||||
bool tqNextDataBlock(STqReader* pReader) {
|
||||
if (pReader->pMsg == NULL) return false;
|
||||
while (1) {
|
||||
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
||||
if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) {
|
||||
return false;
|
||||
}
|
||||
if (pHandle->pBlock == NULL) {
|
||||
pHandle->pMsg = NULL;
|
||||
if (pReader->pBlock == NULL) {
|
||||
pReader->pMsg = NULL;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
if (pReader->tbIdHash == NULL) {
|
||||
return true;
|
||||
}
|
||||
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
|
||||
void* ret = taosHashGet(pReader->tbIdHash, &pReader->msgIter.uid, sizeof(int64_t));
|
||||
/*tqDebug("search uid %ld", pHandle->msgIter.uid);*/
|
||||
if (ret != NULL) {
|
||||
/*tqDebug("find uid %ld", pHandle->msgIter.uid);*/
|
||||
|
@ -129,7 +188,7 @@ bool tqNextDataBlock(SStreamReader* pHandle) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool tqNextDataBlockFilterOut(SStreamReader* pHandle, SHashObj* filterOutUids) {
|
||||
bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
|
||||
while (1) {
|
||||
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
||||
return false;
|
||||
|
@ -145,38 +204,38 @@ bool tqNextDataBlockFilterOut(SStreamReader* pHandle, SHashObj* filterOutUids) {
|
|||
return false;
|
||||
}
|
||||
|
||||
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
|
||||
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
|
||||
// TODO: cache multiple schema
|
||||
int32_t sversion = htonl(pHandle->pBlock->sversion);
|
||||
if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion ||
|
||||
pHandle->cachedSchemaSuid != pHandle->msgIter.suid) {
|
||||
if (pHandle->pSchema) taosMemoryFree(pHandle->pSchema);
|
||||
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
|
||||
if (pHandle->pSchema == NULL) {
|
||||
int32_t sversion = htonl(pReader->pBlock->sversion);
|
||||
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
|
||||
pReader->cachedSchemaSuid != pReader->msgIter.suid) {
|
||||
if (pReader->pSchema) taosMemoryFree(pReader->pSchema);
|
||||
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion);
|
||||
if (pReader->pSchema == NULL) {
|
||||
tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table",
|
||||
pHandle->msgIter.uid, pHandle->msgIter.suid, pHandle->cachedSchemaVer);
|
||||
pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
|
||||
/*ASSERT(0);*/
|
||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pHandle->pSchemaWrapper) tDeleteSSchemaWrapper(pHandle->pSchemaWrapper);
|
||||
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion, true);
|
||||
if (pHandle->pSchemaWrapper == NULL) {
|
||||
if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
|
||||
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, true);
|
||||
if (pReader->pSchemaWrapper == NULL) {
|
||||
tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table",
|
||||
pHandle->msgIter.uid, pHandle->cachedSchemaVer);
|
||||
pReader->msgIter.uid, pReader->cachedSchemaVer);
|
||||
/*ASSERT(0);*/
|
||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||
return -1;
|
||||
}
|
||||
pHandle->cachedSchemaVer = sversion;
|
||||
pHandle->cachedSchemaSuid = pHandle->msgIter.suid;
|
||||
pReader->cachedSchemaVer = sversion;
|
||||
pReader->cachedSchemaSuid = pReader->msgIter.suid;
|
||||
}
|
||||
|
||||
STSchema* pTschema = pHandle->pSchema;
|
||||
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
|
||||
STSchema* pTschema = pReader->pSchema;
|
||||
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
|
||||
|
||||
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
|
||||
int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList);
|
||||
|
||||
if (colNumNeed == 0) {
|
||||
int32_t colMeta = 0;
|
||||
|
@ -199,7 +258,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
|
|||
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
|
||||
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
|
||||
col_id_t colIdSchema = pColSchema->colId;
|
||||
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pHandle->pColIdList, colNeed);
|
||||
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, colNeed);
|
||||
if (colIdSchema < colIdNeed) {
|
||||
colMeta++;
|
||||
} else if (colIdSchema > colIdNeed) {
|
||||
|
@ -216,7 +275,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
|
|||
}
|
||||
}
|
||||
|
||||
if (blockDataEnsureCapacity(pBlock, pHandle->msgIter.numOfRows) < 0) {
|
||||
if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -227,13 +286,12 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
|
|||
STSRow* row;
|
||||
int32_t curRow = 0;
|
||||
|
||||
tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
|
||||
tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
|
||||
|
||||
pBlock->info.groupId = 0;
|
||||
pBlock->info.uid = pHandle->msgIter.uid;
|
||||
pBlock->info.rows = pHandle->msgIter.numOfRows;
|
||||
pBlock->info.uid = pReader->msgIter.uid;
|
||||
pBlock->info.rows = pReader->msgIter.numOfRows;
|
||||
|
||||
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
||||
while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
|
||||
tdSTSRowIterReset(&iter, row);
|
||||
// get all wanted col of that block
|
||||
for (int32_t i = 0; i < colActual; i++) {
|
||||
|
@ -255,9 +313,9 @@ FAIL:
|
|||
return -1;
|
||||
}
|
||||
|
||||
void tqReadHandleSetColIdList(SStreamReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
|
||||
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
|
||||
|
||||
int tqReadHandleSetTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
|
||||
int tqReaderSetTbUidList(STqReader* pHandle, const SArray* tbUidList) {
|
||||
if (pHandle->tbIdHash) {
|
||||
taosHashClear(pHandle->tbIdHash);
|
||||
}
|
||||
|
@ -276,7 +334,7 @@ int tqReadHandleSetTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tqReadHandleAddTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
|
||||
int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) {
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
|
@ -293,7 +351,7 @@ int tqReadHandleAddTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tqReadHandleRemoveTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
|
||||
int tqReaderRemoveTbUidList(STqReader* pHandle, const SArray* tbUidList) {
|
||||
ASSERT(pHandle->tbIdHash != NULL);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
|
|
|
@ -365,7 +365,7 @@ typedef struct SStreamScanInfo {
|
|||
int32_t blockType; // current block type
|
||||
int32_t validBlockIndex; // Is current data has returned?
|
||||
uint64_t numOfExec; // execution times
|
||||
void* streamReader;// stream block reader handle
|
||||
STqReader* tqReader;
|
||||
|
||||
int32_t tsArrayIndex;
|
||||
SArray* tsArray;
|
||||
|
@ -383,6 +383,11 @@ typedef struct SStreamScanInfo {
|
|||
SSDataBlock* pPullDataRes; // pull data SSDataBlock
|
||||
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
|
||||
int32_t deleteDataIndex;
|
||||
|
||||
// status for tmq
|
||||
//SSchemaWrapper schema;
|
||||
STqOffset offset;
|
||||
|
||||
} SStreamScanInfo;
|
||||
|
||||
typedef struct SSysTableScanInfo {
|
||||
|
|
|
@ -45,7 +45,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
pInfo->blockType = type;
|
||||
|
||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
if (tqReadHandleSetMsg(pInfo->streamReader, input, 0) < 0) {
|
||||
if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {
|
||||
qError("submit msg messed up when initing stream block, %s" PRIx64, id);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
|
|||
return code;
|
||||
}
|
||||
|
||||
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
||||
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
|
||||
if (msg == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -120,7 +120,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
|||
}
|
||||
|
||||
qTaskInfo_t pTaskInfo = NULL;
|
||||
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
|
||||
code = qCreateExecTask(readers, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// TODO: destroy SSubplan & pTaskInfo
|
||||
terrno = code;
|
||||
|
@ -174,11 +174,11 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList);
|
||||
|
||||
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
|
||||
code = tqReadHandleAddTbUidList(pScanInfo->streamReader, qa);
|
||||
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
|
||||
taosArrayDestroy(qa);
|
||||
} else { // remove the table id in current list
|
||||
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
|
||||
code = tqReadHandleRemoveTbUidList(pScanInfo->streamReader, tableIdList);
|
||||
code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -236,6 +236,37 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
|
|||
return decodeOperator(pTaskInfo->pRoot, pInput, len);
|
||||
}
|
||||
|
||||
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||
|
||||
while (1) {
|
||||
uint8_t type = pOperator->operatorType;
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
*scanner = pOperator->info;
|
||||
return 0;
|
||||
} else {
|
||||
ASSERT(pOperator->numOfDownstream == 1);
|
||||
pOperator = pOperator->pDownstream[0];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void* qExtractReaderFromStreamScanner(void* scanner) {
|
||||
SStreamScanInfo* pInfo = scanner;
|
||||
return (void*)pInfo->tqReader;
|
||||
}
|
||||
|
||||
const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
|
||||
SStreamScanInfo* pInfo = scanner;
|
||||
return pInfo->tqReader->pSchemaWrapper;
|
||||
}
|
||||
|
||||
const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
|
||||
SStreamScanInfo* pInfo = scanner;
|
||||
return &pInfo->offset;
|
||||
}
|
||||
|
||||
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
|
||||
|
|
|
@ -2844,7 +2844,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
|||
}
|
||||
|
||||
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
||||
int32_t type = pOperator->operatorType;
|
||||
uint8_t type = pOperator->operatorType;
|
||||
|
||||
pOperator->status = OP_OPENED;
|
||||
|
||||
|
@ -4346,11 +4346,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
|
||||
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
|
||||
|
||||
// int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||
// if (code) {
|
||||
// pTaskInfo->code = code;
|
||||
// return NULL;
|
||||
// }
|
||||
// int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||
// if (code) {
|
||||
// pTaskInfo->code = code;
|
||||
// return NULL;
|
||||
// }
|
||||
|
||||
int32_t code = extractTableSchemaVersion(pHandle, pScanNode->uid, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4407,8 +4407,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
|
||||
pScalarExprInfo, numOfScalarExpr, pTaskInfo);
|
||||
} else {
|
||||
pOptr =
|
||||
createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo);
|
||||
pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions,
|
||||
pScalarExprInfo, numOfScalarExpr, pTaskInfo);
|
||||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
|
||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
|
@ -4575,7 +4575,8 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
|||
return pList;
|
||||
}
|
||||
|
||||
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, const char* idstr) {
|
||||
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, const char* idstr) {
|
||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -4819,7 +4820,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
|||
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
|
||||
&(*pTaskInfo)->tableqinfoList, pPlan->user);
|
||||
|
||||
|
||||
if (NULL == (*pTaskInfo)->pRoot) {
|
||||
code = (*pTaskInfo)->code;
|
||||
goto _complete;
|
||||
|
|
|
@ -1232,38 +1232,33 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
|
||||
while (tqNextDataBlock(pInfo->streamReader)) {
|
||||
while (tqNextDataBlock(pInfo->tqReader)) {
|
||||
SSDataBlock block = {0};
|
||||
|
||||
// todo refactor
|
||||
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamReader);
|
||||
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);
|
||||
|
||||
uint64_t groupId = block.info.groupId;
|
||||
uint64_t uid = block.info.uid;
|
||||
int32_t numOfRows = block.info.rows;
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
|
||||
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pInfo->pRes->info.groupId = groupId;
|
||||
pInfo->pRes->info.rows = numOfRows;
|
||||
pInfo->pRes->info.uid = uid;
|
||||
pInfo->pRes->info.rows = block.info.rows;
|
||||
pInfo->pRes->info.uid = block.info.uid;
|
||||
pInfo->pRes->info.type = STREAM_NORMAL;
|
||||
pInfo->pRes->info.capacity = numOfRows;
|
||||
pInfo->pRes->info.capacity = block.info.rows;
|
||||
|
||||
// for generating rollup SMA result, each time is an independent time serie.
|
||||
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
|
||||
if (pInfo->assignBlockUid) {
|
||||
pInfo->pRes->info.groupId = uid;
|
||||
} else {
|
||||
pInfo->pRes->info.groupId = groupId;
|
||||
pInfo->pRes->info.groupId = block.info.uid;
|
||||
}
|
||||
|
||||
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
|
||||
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &block.info.uid, sizeof(int64_t));
|
||||
if (groupIdPre) {
|
||||
pInfo->pRes->info.groupId = *groupIdPre;
|
||||
} else {
|
||||
pInfo->pRes->info.groupId = 0;
|
||||
}
|
||||
|
||||
// todo extract method
|
||||
|
@ -1413,13 +1408,13 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
}
|
||||
}
|
||||
|
||||
if (pHandle->initStreamReader) {
|
||||
ASSERT(pHandle->streamReader == NULL);
|
||||
pInfo->streamReader = tqInitSubmitMsgScanner(pHandle->meta);
|
||||
ASSERT(pInfo->streamReader);
|
||||
if (pHandle->initTqReader) {
|
||||
ASSERT(pHandle->tqReader == NULL);
|
||||
pInfo->tqReader = tqOpenReader(pHandle->vnode);
|
||||
ASSERT(pInfo->tqReader);
|
||||
} else {
|
||||
ASSERT(pHandle->streamReader);
|
||||
pInfo->streamReader = pHandle->streamReader;
|
||||
ASSERT(pHandle->tqReader);
|
||||
pInfo->tqReader = pHandle->tqReader;
|
||||
}
|
||||
|
||||
if (pSTInfo->interval.interval > 0) {
|
||||
|
@ -1435,9 +1430,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->tableUid = pScanPhyNode->uid;
|
||||
|
||||
// set the extract column id to streamHandle
|
||||
tqReadHandleSetColIdList(pInfo->streamReader, pColIds);
|
||||
tqReaderSetColIdList(pInfo->tqReader, pColIds);
|
||||
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
|
||||
int32_t code = tqReadHandleSetTbUidList(pInfo->streamReader, tableIdList);
|
||||
int32_t code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
|
||||
if (code != 0) {
|
||||
taosArrayDestroy(tableIdList);
|
||||
goto _error;
|
||||
|
|
Loading…
Reference in New Issue