From 3ed1ab6acb1e339dcf995cb3d42b327bbc39b704 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 7 May 2022 23:19:05 +0800 Subject: [PATCH 1/2] enh(stream): auto create ctb --- example/src/tmq.c | 4 +- include/common/tdatablock.h | 14 ++--- include/libs/stream/tstream.h | 10 ++- source/common/src/tdatablock.c | 71 +++++++++++++++++++--- source/common/src/tmsg.c | 3 +- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndDef.c | 8 ++- source/dnode/mnode/impl/src/mndScheduler.c | 5 +- source/dnode/mnode/impl/src/mndStream.c | 3 + source/dnode/vnode/src/tq/tq.c | 46 +++++++++----- source/libs/stream/src/tstream.c | 10 +-- source/util/test/encodeTest.cpp | 2 +- 12 files changed, 129 insertions(+), 48 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 976d658fa6..2b73d8351f 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -101,8 +101,8 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ + pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 19b108dcbf..1677ec0d4f 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -56,11 +56,11 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); #define colDataSetNotNull_f(bm_, r_) \ do { \ - BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \ + BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \ } while (0) -#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) -#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) +#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) +#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) #define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT) @@ -187,8 +187,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u } int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); -int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, - uint32_t numOfRow2); +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, + const SColumnInfoData* pSource, uint32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); @@ -230,9 +230,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); void blockDebugShowData(const SArray* dataBlocks); int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, - tb_uid_t uid, tb_uid_t suid); + tb_uid_t uid, tb_uid_t suid); -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema); +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid); static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d7df976e1a..71462a86d4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -69,20 +69,24 @@ typedef struct { SUseDbRsp dbInfo; } STaskDispatcherShuffle; +typedef void FTbSink(void* vnode, int64_t ver, const SArray* data); + typedef struct { - int8_t reserved; + int64_t stbUid; SSchemaWrapper* pSchemaWrapper; // not applicable to encoder and decoder + void* vnode; + FTbSink* tbSinkFunc; STSchema* pTSchema; SHashObj* pHash; // groupId to tbuid } STaskSinkTb; -typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data); +typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef struct { int64_t smaId; // following are not applicable to encoder and decoder - FSmaHandle* smaHandle; + FSmaSink* smaSink; } STaskSinkSma; typedef struct { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 2c17f2c2fc..1c099afc93 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -363,9 +363,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd for (int32_t i = 0; i < pDest->info.numOfCols; ++i) { int32_t mapIndex = i; -// if (pIndexMap) { -// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); -// } + // if (pIndexMap) { + // mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); + // } SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex); @@ -1596,7 +1596,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks return TSDB_CODE_SUCCESS; } -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid) { SSubmitReq* ret = NULL; // cal size @@ -1608,7 +1608,29 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { // TODO min int32_t rowSize = pDataBlock->info.rowSize; int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); - cap += sizeof(SSubmitBlk) + rows * maxLen; + int32_t schemaLen = 0; + + if (createTb) { + SVCreateTbReq createTbReq = {0}; + createTbReq.name = "a"; + createTbReq.flags = 0; + createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = htobe64(suid); + + SKVRowBuilder kvRowBuilder = {0}; + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { + ASSERT(0); + } + tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t)); + createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder); + tdDestroyKVRowBuilder(&kvRowBuilder); + + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); + if (code < 0) return NULL; + } + + cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; } // assign data @@ -1623,19 +1645,47 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { SSubmitBlk* blkHead = submitBlk; blkHead->numOfRows = htons(pDataBlock->info.rows); - blkHead->schemaLen = 0; blkHead->sversion = htonl(pTSchema->version); // TODO - blkHead->suid = 0; - blkHead->uid = htobe64(pDataBlock->info.uid); + blkHead->suid = htobe64(suid); + // uid is assigned by vnode + blkHead->uid = 0; int32_t rows = pDataBlock->info.rows; /*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/ /*blkHead->dataLen = htonl(rows * maxLen);*/ blkHead->dataLen = 0; - void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); - STSRow* rowData = blockData; + void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); + + int32_t schemaLen = 0; + if (createTb) { + SVCreateTbReq createTbReq = {0}; + createTbReq.name = "a"; + createTbReq.flags = 0; + createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = suid; + + SKVRowBuilder kvRowBuilder = {0}; + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { + ASSERT(0); + } + tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t)); + createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder); + tdDestroyKVRowBuilder(&kvRowBuilder); + + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); + if (code < 0) return NULL; + + SEncoder encoder = {0}; + tEncoderInit(&encoder, blockData, schemaLen); + if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL; + tEncoderClear(&encoder); + } + blkHead->schemaLen = htonl(schemaLen); + + STSRow* rowData = POINTER_SHIFT(blockData, schemaLen); for (int32_t j = 0; j < rows; j++) { SRowBuilder rb = {0}; @@ -1656,6 +1706,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { int32_t len = blkHead->dataLen; blkHead->dataLen = htonl(len); blkHead = POINTER_SHIFT(blkHead, len); + /*submitBlk = blkHead;*/ } return ret; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 129143593e..56bb93faa4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -56,7 +56,6 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { ASSERT(0); } - SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen); ASSERT(pIter->len > 0); } @@ -4013,4 +4012,4 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) { tEndDecode(pCoder); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 98d29a6ca5..2751e0752e 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -574,6 +574,7 @@ typedef struct { char sourceDb[TSDB_DB_FNAME_LEN]; char targetDb[TSDB_DB_FNAME_LEN]; char targetSTbName[TSDB_TABLE_FNAME_LEN]; + int64_t targetStbUid; int64_t createTime; int64_t updateTime; int64_t uid; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index a143371089..a2c628b8a1 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -416,6 +416,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { /*int32_t outputNameSz = 0;*/ if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; @@ -465,6 +468,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; @@ -529,4 +535,4 @@ void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) { buf = taosDecodeStringTo(buf, pOffset->key); buf = taosDecodeFixedI64(buf, &pOffset->offset); return buf; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 5bfa70e76d..824f031004 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -204,6 +204,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.stbUid = pStream->targetStbUid; pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); ASSERT(pTask->tbSink.pSchemaWrapper); } @@ -244,9 +245,10 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.stbUid = pStream->targetStbUid; pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); } - // + // dispatch pTask->dispatchType = TASK_DISPATCH__NONE; @@ -319,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.stbUid = pStream->targetStbUid; pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); } #endif diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 2541232a53..8c1557b73d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -360,6 +360,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre goto _OVER; } + stbObj.uid = pStream->targetStbUid; + if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER; return 0; @@ -379,6 +381,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.createTime = taosGetTimestampMs(); streamObj.updateTime = streamObj.createTime; streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); + streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN); streamObj.dbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f12c67bb9c..e769108d59 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -884,24 +884,38 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { } } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { - if (pTask->execType == TASK_EXEC__NONE) return 0; +void tqTableSink(void* vnode, int64_t ver, const SArray* data) { + // + SVnode* pVnode = (SVnode*)vnode; + // build write msg + // +} - pTask->exec.numOfRunners = parallel; - pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner)); - if (pTask->exec.runners == NULL) { - return -1; +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { + if (pTask->execType != TASK_EXEC__NONE) { + // expand runners + pTask->exec.numOfRunners = parallel; + pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner)); + if (pTask->exec.runners == NULL) { + return -1; + } + for (int32_t i = 0; i < parallel; i++) { + STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + SReadHandle handle = { + .reader = pStreamReader, + .meta = pTq->pVnode->pMeta, + }; + pTask->exec.runners[i].inputHandle = pStreamReader; + pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); + ASSERT(pTask->exec.runners[i].executor); + } } - for (int32_t i = 0; i < parallel; i++) { - STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); - SReadHandle handle = { - .reader = pStreamReader, - .meta = pTq->pVnode->pMeta, - }; - pTask->exec.runners[i].inputHandle = pStreamReader; - pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); - ASSERT(pTask->exec.runners[i].executor); + + if (pTask->sinkType == TASK_SINK__TABLE) { + pTask->tbSink.vnode = pTq->pVnode; + pTask->tbSink.tbSinkFunc = tqTableSink; } + return 0; } @@ -925,7 +939,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { // sink pTask->ahandle = pTq->pVnode; if (pTask->sinkType == TASK_SINK__SMA) { - pTask->smaSink.smaHandle = smaHandleRes; + pTask->smaSink.smaSink = smaHandleRes; } else if (pTask->sinkType == TASK_SINK__TABLE) { ASSERT(pTask->tbSink.pSchemaWrapper); ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 707e153a0c..a78f818ded 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -152,12 +152,12 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { - /*blockDebugShowData(pRes);*/ + blockDebugShowData(pRes); ASSERT(pTask->tbSink.pTSchema); - SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema); + SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid); tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); } else if (pTask->sinkType == TASK_SINK__SMA) { - pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes); + pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); // } else if (pTask->sinkType == TASK_SINK__FETCH) { // @@ -276,7 +276,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { } if (pTask->sinkType == TASK_SINK__TABLE) { - /*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/ + if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1; if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SMA) { if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; @@ -321,7 +321,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } if (pTask->sinkType == TASK_SINK__TABLE) { - /*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/ + if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1; pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); if (pTask->tbSink.pSchemaWrapper == NULL) return -1; if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; diff --git a/source/util/test/encodeTest.cpp b/source/util/test/encodeTest.cpp index 6715cb3db8..00e12a4fe8 100644 --- a/source/util/test/encodeTest.cpp +++ b/source/util/test/encodeTest.cpp @@ -442,4 +442,4 @@ TEST(td_encode_test, compound_struct_encode_test) { #endif #pragma GCC diagnostic pop -#endif \ No newline at end of file +#endif From db3cbbf22d6179a2cb9d148b0008e7cab848abbd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 8 May 2022 00:21:17 +0800 Subject: [PATCH 2/2] fix(query): column match --- source/libs/executor/src/scanoperator.c | 189 ++++++++++++------------ source/libs/stream/src/tstream.c | 2 +- 2 files changed, 99 insertions(+), 92 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 72f285231b..0296a312d0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -38,7 +38,8 @@ #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity); -static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); +static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, + const char* dbName); static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { @@ -159,7 +160,8 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return false; } -static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { +static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, + uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableScanInfo* pInfo = pOperator->info; @@ -189,7 +191,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { pCost->loadBlockStatis += 1; - bool allColumnsHaveAgg = true; + bool allColumnsHaveAgg = true; SColumnDataAgg** pColAgg = NULL; tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg); @@ -261,7 +263,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; - SSDataBlock* pBlock = pTableScanInfo->pResBlock; + SSDataBlock* pBlock = pTableScanInfo->pResBlock; while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { if (isTaskKilled(pOperator->pTaskInfo)) { @@ -344,7 +346,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { pTableScanInfo->scanFlag = REPEAT_SCAN; qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64 - "-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); + "-%" PRId64, + GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); // do prepare for the next round table scan operation tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -373,22 +376,22 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon pInfo->cond = *pCond; pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]}; - pInfo->interval = *pInterval; - pInfo->sampleRatio = sampleRatio; + pInfo->interval = *pInterval; + pInfo->sampleRatio = sampleRatio; pInfo->dataBlockLoadFlag = dataLoadFlag; - pInfo->pResBlock = pResBlock; - pInfo->pFilterNode = pCondition; - pInfo->dataReader = pDataReader; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColMatchInfo; + pInfo->pResBlock = pResBlock; + pInfo->pFilterNode = pCondition; + pInfo->dataReader = pDataReader; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; - pOperator->name = "TableScanOperator"; // for dubug purpose - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = numOfOutput; - pOperator->pTaskInfo = pTaskInfo; + pOperator->name = "TableScanOperator"; // for dubug purpose + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfOutput; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL); @@ -404,17 +407,17 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pInfo->dataReader = pReadHandle; -// pInfo->prevGroupId = -1; + pInfo->dataReader = pReadHandle; + // pInfo->prevGroupId = -1; - pOperator->name = "TableSeqScanOperator"; + pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL); return pOperator; @@ -514,18 +517,18 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) { SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex); - TSKEY* ts = (TSKEY*)pColDataInfo->pData; + TSKEY* ts = (TSKEY*)pColDataInfo->pData; for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) { if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) { - taosArrayPush(pInfo->tsArray, ts+i); + taosArrayPush(pInfo->tsArray, ts + i); } } if (taosArrayGetSize(pInfo->tsArray) > 0) { - //TODO(liuyao) get from tsdb - // SSDataBlock* p = createOneDataBlock(pInfo->pRes, true); - // p->info.type = STREAM_INVERT; - // taosArrayClear(pInfo->tsArray); - // return p; + // TODO(liuyao) get from tsdb + // SSDataBlock* p = createOneDataBlock(pInfo->pRes, true); + // p->info.type = STREAM_INVERT; + // taosArrayClear(pInfo->tsArray); + // return p; return NULL; } return NULL; @@ -535,7 +538,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; - int32_t rows = 0; + int32_t rows = 0; pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { @@ -571,7 +574,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { int32_t numOfRows = 0; int16_t outputCol = 0; - int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol); + int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol); if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { pTaskInfo->code = code; @@ -652,8 +655,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); for (int32_t i = 0; i < numOfOutput; ++i) { - int16_t* id = taosArrayGet(pColList, i); - taosArrayPush(pColIds, id); + SColMatchInfo* id = taosArrayGet(pColList, i); + int16_t colId = id->colId; + taosArrayPush(pColIds, &colId); } pInfo->pColMatchInfo = pColList; @@ -678,8 +682,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* return NULL; } - pInfo->primaryTsIndex = 0; //TODO(liuyao) get it from physical plan - pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); //TODO(liuyao) get it from physical plan + pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan + pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); // TODO(liuyao) get it from physical plan if (pInfo->pUpdateInfo == NULL) { taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -687,25 +691,26 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* } pInfo->readerHandle = streamReadHandle; - pInfo->pRes = pResBlock; - pInfo->pCondition = pCondition; + pInfo->pRes = pResBlock; + pInfo->pCondition = pCondition; - pOperator->name = "StreamBlockScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pResBlock->info.numOfCols; - pOperator->fpSet._openFn = operatorDummyOpenFn; + pOperator->name = "StreamBlockScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pResBlock->info.numOfCols; + pOperator->fpSet._openFn = operatorDummyOpenFn; pOperator->fpSet.getNextFn = doStreamBlockScan; - pOperator->fpSet.closeFn = operatorDummyCloseFn; + pOperator->fpSet.closeFn = operatorDummyCloseFn; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); return pOperator; - _error: +_error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); return NULL; @@ -774,7 +779,7 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) { if (NULL == pCondition) { return; } - nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*) dbName); + nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName); } static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { @@ -809,7 +814,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { code = filterSetDataFromSlotId(filter, ¶m1); int8_t* rowRes = NULL; - bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); + bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); filterFreeInfo(filter); SSDataBlock* px = createOneDataBlock(pInfo->pRes, false); @@ -853,13 +858,13 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { static SSDataBlock* buildSysTableMetaBlock() { SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - size_t size = 0; - const SSysTableMeta *pMeta = NULL; + size_t size = 0; + const SSysTableMeta* pMeta = NULL; getInfosDbMeta(&pMeta, &size); int32_t index = 0; - for(int32_t i = 0; i < size; ++i) { - if(strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) { + for (int32_t i = 0; i < size; ++i) { + if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) { index = i; break; } @@ -867,7 +872,7 @@ static SSDataBlock* buildSysTableMetaBlock() { pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData)); - for(int32_t i = 0; i < pMeta[index].colNum; ++i) { + for (int32_t i = 0; i < pMeta[index].colNum; ++i) { SColumnInfoData colInfoData = {0}; colInfoData.info.colId = i + 1; colInfoData.info.type = pMeta[index].schema[i].type; @@ -1091,7 +1096,7 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { SSDataBlock* p = buildSysTableMetaBlock(); blockDataEnsureCapacity(p, capacity); - size_t size = 0; + size_t size = 0; const SSysTableMeta* pSysDbTableMeta = NULL; getInfosDbMeta(&pSysDbTableMeta, &size); @@ -1100,18 +1105,19 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { getPerfDbMeta(&pSysDbTableMeta, &size); p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB); - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock); -// blockDataDestroy(p); todo handle memory leak + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock); + // blockDataDestroy(p); todo handle memory leak pInfo->pRes->info.rows = p->info.rows; return p->info.rows; } -int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName) { - char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; +int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, + const char* dbName) { + char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; int32_t numOfRows = p->info.rows; - for(int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < size; ++i) { const SSysTableMeta* pm = &pSysDbTableMeta[i]; SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0); @@ -1132,7 +1138,7 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT pColInfoData = taosArrayGet(p->pDataBlock, 3); colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false); - for(int32_t j = 4; j <= 8; ++j) { + for (int32_t j = 4; j <= 8; ++j) { pColInfoData = taosArrayGet(p->pDataBlock, j); colDataAppendNULL(pColInfoData, numOfRows); } @@ -1160,18 +1166,18 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe return NULL; } - pInfo->accountId = accountId; + pInfo->accountId = accountId; pInfo->showRewrite = showRewrite; - pInfo->pRes = pResBlock; - pInfo->pCondition = pCondition; - pInfo->scanCols = colList; + pInfo->pRes = pResBlock; + pInfo->pCondition = pCondition; + pInfo->scanCols = colList; initResultSizeInfo(pOperator, 4096); tNameAssign(&pInfo->name, pName); const char* name = tNameGetTableName(&pInfo->name); if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { - pInfo->readHandle = *(SReadHandle*) readHandle; + pInfo->readHandle = *(SReadHandle*)readHandle; blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); } else { tsem_init(&pInfo->ready, 0, 0); @@ -1201,14 +1207,14 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe #endif } - pOperator->name = "SysTableScanOperator"; + pOperator->name = "SysTableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pResBlock->info.numOfCols; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, - NULL, NULL, NULL); + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pResBlock->info.numOfCols; + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -1355,26 +1361,27 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { } SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, - SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, SArray* pColMatchInfo, + STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pTableGroups = pTableGroupInfo; - pInfo->pColMatchInfo = pColMatchInfo; - pInfo->pRes = pResBlock; - pInfo->readHandle = *pReadHandle; - pInfo->curPos = 0; - pOperator->name = "TagScanOperator"; + pInfo->pTableGroups = pTableGroupInfo; + pInfo->pColMatchInfo = pColMatchInfo; + pInfo->pRes = pResBlock; + pInfo->readHandle = *pReadHandle; + pInfo->curPos = 0; + pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pExpr = pExpr; - pOperator->numOfExprs = numOfOutput; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExpr; + pOperator->numOfExprs = numOfOutput; + pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(pOperator, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index a78f818ded..3b49dca800 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -152,7 +152,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { - blockDebugShowData(pRes); + /*blockDebugShowData(pRes);*/ ASSERT(pTask->tbSink.pTSchema); SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid); tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);