From 3ed1ab6acb1e339dcf995cb3d42b327bbc39b704 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 7 May 2022 23:19:05 +0800 Subject: [PATCH] enh(stream): auto create ctb --- example/src/tmq.c | 4 +- include/common/tdatablock.h | 14 ++--- include/libs/stream/tstream.h | 10 ++- source/common/src/tdatablock.c | 71 +++++++++++++++++++--- source/common/src/tmsg.c | 3 +- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndDef.c | 8 ++- source/dnode/mnode/impl/src/mndScheduler.c | 5 +- source/dnode/mnode/impl/src/mndStream.c | 3 + source/dnode/vnode/src/tq/tq.c | 46 +++++++++----- source/libs/stream/src/tstream.c | 10 +-- source/util/test/encodeTest.cpp | 2 +- 12 files changed, 129 insertions(+), 48 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 976d658fa6..2b73d8351f 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -101,8 +101,8 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ + pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 19b108dcbf..1677ec0d4f 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -56,11 +56,11 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); #define colDataSetNotNull_f(bm_, r_) \ do { \ - BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \ + BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \ } while (0) -#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) -#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) +#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) +#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) #define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT) @@ -187,8 +187,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u } int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); -int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, - uint32_t numOfRow2); +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, + const SColumnInfoData* pSource, uint32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); @@ -230,9 +230,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); void blockDebugShowData(const SArray* dataBlocks); int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, - tb_uid_t uid, tb_uid_t suid); + tb_uid_t uid, tb_uid_t suid); -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema); +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid); static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d7df976e1a..71462a86d4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -69,20 +69,24 @@ typedef struct { SUseDbRsp dbInfo; } STaskDispatcherShuffle; +typedef void FTbSink(void* vnode, int64_t ver, const SArray* data); + typedef struct { - int8_t reserved; + int64_t stbUid; SSchemaWrapper* pSchemaWrapper; // not applicable to encoder and decoder + void* vnode; + FTbSink* tbSinkFunc; STSchema* pTSchema; SHashObj* pHash; // groupId to tbuid } STaskSinkTb; -typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data); +typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef struct { int64_t smaId; // following are not applicable to encoder and decoder - FSmaHandle* smaHandle; + FSmaSink* smaSink; } STaskSinkSma; typedef struct { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 2c17f2c2fc..1c099afc93 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -363,9 +363,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd for (int32_t i = 0; i < pDest->info.numOfCols; ++i) { int32_t mapIndex = i; -// if (pIndexMap) { -// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); -// } + // if (pIndexMap) { + // mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); + // } SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex); @@ -1596,7 +1596,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks return TSDB_CODE_SUCCESS; } -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid) { SSubmitReq* ret = NULL; // cal size @@ -1608,7 +1608,29 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { // TODO min int32_t rowSize = pDataBlock->info.rowSize; int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); - cap += sizeof(SSubmitBlk) + rows * maxLen; + int32_t schemaLen = 0; + + if (createTb) { + SVCreateTbReq createTbReq = {0}; + createTbReq.name = "a"; + createTbReq.flags = 0; + createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = htobe64(suid); + + SKVRowBuilder kvRowBuilder = {0}; + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { + ASSERT(0); + } + tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t)); + createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder); + tdDestroyKVRowBuilder(&kvRowBuilder); + + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); + if (code < 0) return NULL; + } + + cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; } // assign data @@ -1623,19 +1645,47 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { SSubmitBlk* blkHead = submitBlk; blkHead->numOfRows = htons(pDataBlock->info.rows); - blkHead->schemaLen = 0; blkHead->sversion = htonl(pTSchema->version); // TODO - blkHead->suid = 0; - blkHead->uid = htobe64(pDataBlock->info.uid); + blkHead->suid = htobe64(suid); + // uid is assigned by vnode + blkHead->uid = 0; int32_t rows = pDataBlock->info.rows; /*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/ /*blkHead->dataLen = htonl(rows * maxLen);*/ blkHead->dataLen = 0; - void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); - STSRow* rowData = blockData; + void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); + + int32_t schemaLen = 0; + if (createTb) { + SVCreateTbReq createTbReq = {0}; + createTbReq.name = "a"; + createTbReq.flags = 0; + createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = suid; + + SKVRowBuilder kvRowBuilder = {0}; + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { + ASSERT(0); + } + tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t)); + createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder); + tdDestroyKVRowBuilder(&kvRowBuilder); + + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); + if (code < 0) return NULL; + + SEncoder encoder = {0}; + tEncoderInit(&encoder, blockData, schemaLen); + if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL; + tEncoderClear(&encoder); + } + blkHead->schemaLen = htonl(schemaLen); + + STSRow* rowData = POINTER_SHIFT(blockData, schemaLen); for (int32_t j = 0; j < rows; j++) { SRowBuilder rb = {0}; @@ -1656,6 +1706,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { int32_t len = blkHead->dataLen; blkHead->dataLen = htonl(len); blkHead = POINTER_SHIFT(blkHead, len); + /*submitBlk = blkHead;*/ } return ret; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 129143593e..56bb93faa4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -56,7 +56,6 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { ASSERT(0); } - SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen); ASSERT(pIter->len > 0); } @@ -4013,4 +4012,4 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) { tEndDecode(pCoder); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 98d29a6ca5..2751e0752e 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -574,6 +574,7 @@ typedef struct { char sourceDb[TSDB_DB_FNAME_LEN]; char targetDb[TSDB_DB_FNAME_LEN]; char targetSTbName[TSDB_TABLE_FNAME_LEN]; + int64_t targetStbUid; int64_t createTime; int64_t updateTime; int64_t uid; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index a143371089..a2c628b8a1 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -416,6 +416,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { /*int32_t outputNameSz = 0;*/ if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; @@ -465,6 +468,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; @@ -529,4 +535,4 @@ void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) { buf = taosDecodeStringTo(buf, pOffset->key); buf = taosDecodeFixedI64(buf, &pOffset->offset); return buf; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 5bfa70e76d..824f031004 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -204,6 +204,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.stbUid = pStream->targetStbUid; pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); ASSERT(pTask->tbSink.pSchemaWrapper); } @@ -244,9 +245,10 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.stbUid = pStream->targetStbUid; pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); } - // + // dispatch pTask->dispatchType = TASK_DISPATCH__NONE; @@ -319,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.stbUid = pStream->targetStbUid; pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); } #endif diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 2541232a53..8c1557b73d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -360,6 +360,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre goto _OVER; } + stbObj.uid = pStream->targetStbUid; + if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER; return 0; @@ -379,6 +381,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.createTime = taosGetTimestampMs(); streamObj.updateTime = streamObj.createTime; streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); + streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN); streamObj.dbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f12c67bb9c..e769108d59 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -884,24 +884,38 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { } } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { - if (pTask->execType == TASK_EXEC__NONE) return 0; +void tqTableSink(void* vnode, int64_t ver, const SArray* data) { + // + SVnode* pVnode = (SVnode*)vnode; + // build write msg + // +} - pTask->exec.numOfRunners = parallel; - pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner)); - if (pTask->exec.runners == NULL) { - return -1; +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { + if (pTask->execType != TASK_EXEC__NONE) { + // expand runners + pTask->exec.numOfRunners = parallel; + pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner)); + if (pTask->exec.runners == NULL) { + return -1; + } + for (int32_t i = 0; i < parallel; i++) { + STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + SReadHandle handle = { + .reader = pStreamReader, + .meta = pTq->pVnode->pMeta, + }; + pTask->exec.runners[i].inputHandle = pStreamReader; + pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); + ASSERT(pTask->exec.runners[i].executor); + } } - for (int32_t i = 0; i < parallel; i++) { - STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); - SReadHandle handle = { - .reader = pStreamReader, - .meta = pTq->pVnode->pMeta, - }; - pTask->exec.runners[i].inputHandle = pStreamReader; - pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); - ASSERT(pTask->exec.runners[i].executor); + + if (pTask->sinkType == TASK_SINK__TABLE) { + pTask->tbSink.vnode = pTq->pVnode; + pTask->tbSink.tbSinkFunc = tqTableSink; } + return 0; } @@ -925,7 +939,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { // sink pTask->ahandle = pTq->pVnode; if (pTask->sinkType == TASK_SINK__SMA) { - pTask->smaSink.smaHandle = smaHandleRes; + pTask->smaSink.smaSink = smaHandleRes; } else if (pTask->sinkType == TASK_SINK__TABLE) { ASSERT(pTask->tbSink.pSchemaWrapper); ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 707e153a0c..a78f818ded 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -152,12 +152,12 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { - /*blockDebugShowData(pRes);*/ + blockDebugShowData(pRes); ASSERT(pTask->tbSink.pTSchema); - SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema); + SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid); tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); } else if (pTask->sinkType == TASK_SINK__SMA) { - pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes); + pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); // } else if (pTask->sinkType == TASK_SINK__FETCH) { // @@ -276,7 +276,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { } if (pTask->sinkType == TASK_SINK__TABLE) { - /*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/ + if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1; if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SMA) { if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; @@ -321,7 +321,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } if (pTask->sinkType == TASK_SINK__TABLE) { - /*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/ + if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1; pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); if (pTask->tbSink.pSchemaWrapper == NULL) return -1; if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; diff --git a/source/util/test/encodeTest.cpp b/source/util/test/encodeTest.cpp index 6715cb3db8..00e12a4fe8 100644 --- a/source/util/test/encodeTest.cpp +++ b/source/util/test/encodeTest.cpp @@ -442,4 +442,4 @@ TEST(td_encode_test, compound_struct_encode_test) { #endif #pragma GCC diagnostic pop -#endif \ No newline at end of file +#endif