From 3cad99a178df2aecaad1542416a7f09bfa1672be Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 7 May 2022 12:21:00 +0000 Subject: [PATCH 01/11] refact --- source/dnode/vnode/src/tsdb/tsdbMemTable2.c | 46 ++++++++++++++++----- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index e401782747..dbea55f633 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -76,6 +76,8 @@ struct SMemSkipListCurosr { #define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l) #define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l) +static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); + // SMemTable int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { SMemTable *pMemTb = NULL; @@ -176,20 +178,19 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p // do insert data to SMemData SMemSkipListCurosr slc = {0}; - const uint8_t *p = pSubmitBlk->pData; - const uint8_t *pt; const STSRow *pRow; - uint64_t szRow; + uint32_t szRow; SDecoder decoder = {0}; - // tCoderInit(&coder, TD_LITTLE_ENDIAN, pSubmitBlk->pData, pSubmitBlk->nData, TD_DECODER); + tDecoderInit(&decoder, pSubmitBlk->pData, pSubmitBlk->nData); for (;;) { - // if (tDecodeIsEnd(&coder)) break; + if (tDecodeIsEnd(&decoder)) break; + + if (tDecodeBinary(&decoder, (const uint8_t **)&pRow, &szRow) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } - // if (tDecodeBinary(&coder, (const uint8_t **)&pRow, &szRow) < 0) { - // terrno = TSDB_CODE_INVALID_MSG; - // return -1; - // } // check the row (todo) // // move the cursor to position to write (todo) @@ -197,7 +198,9 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p // tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c); // ASSERT(c); - // // encode row + // encode row + int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); + // int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); // int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt); // pSlNode = vnodeBufPoolMalloc(pPool, tsize); @@ -215,7 +218,7 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p if (pRow->ts < pMemData->minKey) pMemData->minKey = pRow->ts; if (pRow->ts > pMemData->maxKey) pMemData->maxKey = pRow->ts; } - // tCoderClear(&coder); + tDecoderClear(&decoder); // tsdbMemSkipListCursorClose(&slc); // update status @@ -228,4 +231,25 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version; return 0; +} + +static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { + int8_t level = 1; + const uint32_t factor = 4; + + if (pSl->size) { + while ((taosRandR(&pSl->seed) % factor) == 0 && level < pSl->maxLevel) { + level++; + } + + if (level > pSl->level) { + if (pSl->level < pSl->maxLevel) { + level = pSl->level + 1; + } else { + level = pSl->level; + } + } + } + + return level; } \ No newline at end of file From 16897f17dcbb0fe652f522e36ed237ae720db8dc Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 7 May 2022 13:16:07 +0000 Subject: [PATCH 02/11] more refact --- source/dnode/vnode/src/tsdb/tsdbMemTable2.c | 25 +++++++++------------ 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index dbea55f633..952ccfda9c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -199,12 +199,15 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p // ASSERT(c); // encode row - int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); + int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); + int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (0 /*todo*/); + SMemSkipListNode *pNode = vnodeBufPoolMalloc(pPool, tsize); + if (pNode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - // int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); - // int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt); - // pSlNode = vnodeBufPoolMalloc(pPool, tsize); - // pSlNode->level = level; + pNode->level = level; // uint8_t *pData = SL_NODE_DATA(pSlNode); // *(int64_t *)pData = version; @@ -235,20 +238,14 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { int8_t level = 1; + int8_t tlevel; const uint32_t factor = 4; if (pSl->size) { - while ((taosRandR(&pSl->seed) % factor) == 0 && level < pSl->maxLevel) { + tlevel = TMIN(pSl->maxLevel, pSl->level + 1); + while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) { level++; } - - if (level > pSl->level) { - if (pSl->level < pSl->maxLevel) { - level = pSl->level + 1; - } else { - level = pSl->level; - } - } } return level; From 712a1bfa71c14d4861847a77d6b87be27a45cbfd Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 7 May 2022 22:40:15 +0800 Subject: [PATCH 03/11] refactor: adjust trans logs --- source/dnode/mnode/impl/src/mndTrans.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index e57e9a0461..3d4254da03 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -967,7 +967,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) pAction->msgSent = 0; pAction->msgReceived = 0; pAction->errCode = 0; - mDebug("trans:%d, action:%d is reset and will be re-executed", pTrans->id, action); + mDebug("trans:%d, action:%d execute status is reset", pTrans->id, action); } } @@ -1043,7 +1043,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA return errCode; } } else { - mDebug("trans:%d, %d of %d actions executing", pTrans->id, numOfReceived, numOfActions); + mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfReceived, numOfActions); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } } From 48ca2e5f76888c12ad5c9a5ff1c154b34749a6db Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 7 May 2022 22:47:18 +0800 Subject: [PATCH 04/11] fix: datadir info is not recorded in monitor --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9329e7b15d..bafb9e7a5b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -51,6 +51,7 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) { pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs; pMgmt->state = pInfo->vstat; + tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs); taosArrayDestroy(vloads.pVloads); } From 3ed1ab6acb1e339dcf995cb3d42b327bbc39b704 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 7 May 2022 23:19:05 +0800 Subject: [PATCH 05/11] enh(stream): auto create ctb --- example/src/tmq.c | 4 +- include/common/tdatablock.h | 14 ++--- include/libs/stream/tstream.h | 10 ++- source/common/src/tdatablock.c | 71 +++++++++++++++++++--- source/common/src/tmsg.c | 3 +- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndDef.c | 8 ++- source/dnode/mnode/impl/src/mndScheduler.c | 5 +- source/dnode/mnode/impl/src/mndStream.c | 3 + source/dnode/vnode/src/tq/tq.c | 46 +++++++++----- source/libs/stream/src/tstream.c | 10 +-- source/util/test/encodeTest.cpp | 2 +- 12 files changed, 129 insertions(+), 48 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 976d658fa6..2b73d8351f 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -101,8 +101,8 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ + pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 19b108dcbf..1677ec0d4f 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -56,11 +56,11 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); #define colDataSetNotNull_f(bm_, r_) \ do { \ - BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \ + BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \ } while (0) -#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) -#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) +#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) +#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) #define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT) @@ -187,8 +187,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u } int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); -int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, - uint32_t numOfRow2); +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, + const SColumnInfoData* pSource, uint32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); @@ -230,9 +230,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); void blockDebugShowData(const SArray* dataBlocks); int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, - tb_uid_t uid, tb_uid_t suid); + tb_uid_t uid, tb_uid_t suid); -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema); +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid); static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d7df976e1a..71462a86d4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -69,20 +69,24 @@ typedef struct { SUseDbRsp dbInfo; } STaskDispatcherShuffle; +typedef void FTbSink(void* vnode, int64_t ver, const SArray* data); + typedef struct { - int8_t reserved; + int64_t stbUid; SSchemaWrapper* pSchemaWrapper; // not applicable to encoder and decoder + void* vnode; + FTbSink* tbSinkFunc; STSchema* pTSchema; SHashObj* pHash; // groupId to tbuid } STaskSinkTb; -typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data); +typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef struct { int64_t smaId; // following are not applicable to encoder and decoder - FSmaHandle* smaHandle; + FSmaSink* smaSink; } STaskSinkSma; typedef struct { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 2c17f2c2fc..1c099afc93 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -363,9 +363,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd for (int32_t i = 0; i < pDest->info.numOfCols; ++i) { int32_t mapIndex = i; -// if (pIndexMap) { -// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); -// } + // if (pIndexMap) { + // mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); + // } SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex); @@ -1596,7 +1596,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks return TSDB_CODE_SUCCESS; } -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid) { SSubmitReq* ret = NULL; // cal size @@ -1608,7 +1608,29 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { // TODO min int32_t rowSize = pDataBlock->info.rowSize; int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); - cap += sizeof(SSubmitBlk) + rows * maxLen; + int32_t schemaLen = 0; + + if (createTb) { + SVCreateTbReq createTbReq = {0}; + createTbReq.name = "a"; + createTbReq.flags = 0; + createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = htobe64(suid); + + SKVRowBuilder kvRowBuilder = {0}; + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { + ASSERT(0); + } + tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t)); + createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder); + tdDestroyKVRowBuilder(&kvRowBuilder); + + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); + if (code < 0) return NULL; + } + + cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; } // assign data @@ -1623,19 +1645,47 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { SSubmitBlk* blkHead = submitBlk; blkHead->numOfRows = htons(pDataBlock->info.rows); - blkHead->schemaLen = 0; blkHead->sversion = htonl(pTSchema->version); // TODO - blkHead->suid = 0; - blkHead->uid = htobe64(pDataBlock->info.uid); + blkHead->suid = htobe64(suid); + // uid is assigned by vnode + blkHead->uid = 0; int32_t rows = pDataBlock->info.rows; /*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/ /*blkHead->dataLen = htonl(rows * maxLen);*/ blkHead->dataLen = 0; - void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); - STSRow* rowData = blockData; + void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); + + int32_t schemaLen = 0; + if (createTb) { + SVCreateTbReq createTbReq = {0}; + createTbReq.name = "a"; + createTbReq.flags = 0; + createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = suid; + + SKVRowBuilder kvRowBuilder = {0}; + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { + ASSERT(0); + } + tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t)); + createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder); + tdDestroyKVRowBuilder(&kvRowBuilder); + + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); + if (code < 0) return NULL; + + SEncoder encoder = {0}; + tEncoderInit(&encoder, blockData, schemaLen); + if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL; + tEncoderClear(&encoder); + } + blkHead->schemaLen = htonl(schemaLen); + + STSRow* rowData = POINTER_SHIFT(blockData, schemaLen); for (int32_t j = 0; j < rows; j++) { SRowBuilder rb = {0}; @@ -1656,6 +1706,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { int32_t len = blkHead->dataLen; blkHead->dataLen = htonl(len); blkHead = POINTER_SHIFT(blkHead, len); + /*submitBlk = blkHead;*/ } return ret; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 129143593e..56bb93faa4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -56,7 +56,6 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { ASSERT(0); } - SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen); ASSERT(pIter->len > 0); } @@ -4013,4 +4012,4 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) { tEndDecode(pCoder); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 98d29a6ca5..2751e0752e 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -574,6 +574,7 @@ typedef struct { char sourceDb[TSDB_DB_FNAME_LEN]; char targetDb[TSDB_DB_FNAME_LEN]; char targetSTbName[TSDB_TABLE_FNAME_LEN]; + int64_t targetStbUid; int64_t createTime; int64_t updateTime; int64_t uid; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index a143371089..a2c628b8a1 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -416,6 +416,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { /*int32_t outputNameSz = 0;*/ if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; @@ -465,6 +468,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; @@ -529,4 +535,4 @@ void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) { buf = taosDecodeStringTo(buf, pOffset->key); buf = taosDecodeFixedI64(buf, &pOffset->offset); return buf; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 5bfa70e76d..824f031004 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -204,6 +204,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.stbUid = pStream->targetStbUid; pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); ASSERT(pTask->tbSink.pSchemaWrapper); } @@ -244,9 +245,10 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.stbUid = pStream->targetStbUid; pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); } - // + // dispatch pTask->dispatchType = TASK_DISPATCH__NONE; @@ -319,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.stbUid = pStream->targetStbUid; pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); } #endif diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 2541232a53..8c1557b73d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -360,6 +360,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre goto _OVER; } + stbObj.uid = pStream->targetStbUid; + if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER; return 0; @@ -379,6 +381,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.createTime = taosGetTimestampMs(); streamObj.updateTime = streamObj.createTime; streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); + streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN); streamObj.dbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f12c67bb9c..e769108d59 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -884,24 +884,38 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { } } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { - if (pTask->execType == TASK_EXEC__NONE) return 0; +void tqTableSink(void* vnode, int64_t ver, const SArray* data) { + // + SVnode* pVnode = (SVnode*)vnode; + // build write msg + // +} - pTask->exec.numOfRunners = parallel; - pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner)); - if (pTask->exec.runners == NULL) { - return -1; +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { + if (pTask->execType != TASK_EXEC__NONE) { + // expand runners + pTask->exec.numOfRunners = parallel; + pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner)); + if (pTask->exec.runners == NULL) { + return -1; + } + for (int32_t i = 0; i < parallel; i++) { + STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + SReadHandle handle = { + .reader = pStreamReader, + .meta = pTq->pVnode->pMeta, + }; + pTask->exec.runners[i].inputHandle = pStreamReader; + pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); + ASSERT(pTask->exec.runners[i].executor); + } } - for (int32_t i = 0; i < parallel; i++) { - STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); - SReadHandle handle = { - .reader = pStreamReader, - .meta = pTq->pVnode->pMeta, - }; - pTask->exec.runners[i].inputHandle = pStreamReader; - pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); - ASSERT(pTask->exec.runners[i].executor); + + if (pTask->sinkType == TASK_SINK__TABLE) { + pTask->tbSink.vnode = pTq->pVnode; + pTask->tbSink.tbSinkFunc = tqTableSink; } + return 0; } @@ -925,7 +939,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { // sink pTask->ahandle = pTq->pVnode; if (pTask->sinkType == TASK_SINK__SMA) { - pTask->smaSink.smaHandle = smaHandleRes; + pTask->smaSink.smaSink = smaHandleRes; } else if (pTask->sinkType == TASK_SINK__TABLE) { ASSERT(pTask->tbSink.pSchemaWrapper); ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 707e153a0c..a78f818ded 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -152,12 +152,12 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { - /*blockDebugShowData(pRes);*/ + blockDebugShowData(pRes); ASSERT(pTask->tbSink.pTSchema); - SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema); + SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid); tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); } else if (pTask->sinkType == TASK_SINK__SMA) { - pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes); + pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); // } else if (pTask->sinkType == TASK_SINK__FETCH) { // @@ -276,7 +276,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { } if (pTask->sinkType == TASK_SINK__TABLE) { - /*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/ + if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1; if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SMA) { if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; @@ -321,7 +321,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } if (pTask->sinkType == TASK_SINK__TABLE) { - /*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/ + if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1; pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); if (pTask->tbSink.pSchemaWrapper == NULL) return -1; if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; diff --git a/source/util/test/encodeTest.cpp b/source/util/test/encodeTest.cpp index 6715cb3db8..00e12a4fe8 100644 --- a/source/util/test/encodeTest.cpp +++ b/source/util/test/encodeTest.cpp @@ -442,4 +442,4 @@ TEST(td_encode_test, compound_struct_encode_test) { #endif #pragma GCC diagnostic pop -#endif \ No newline at end of file +#endif From db3cbbf22d6179a2cb9d148b0008e7cab848abbd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 8 May 2022 00:21:17 +0800 Subject: [PATCH 06/11] fix(query): column match --- source/libs/executor/src/scanoperator.c | 189 ++++++++++++------------ source/libs/stream/src/tstream.c | 2 +- 2 files changed, 99 insertions(+), 92 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 72f285231b..0296a312d0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -38,7 +38,8 @@ #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity); -static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); +static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, + const char* dbName); static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { @@ -159,7 +160,8 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return false; } -static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { +static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, + uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableScanInfo* pInfo = pOperator->info; @@ -189,7 +191,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { pCost->loadBlockStatis += 1; - bool allColumnsHaveAgg = true; + bool allColumnsHaveAgg = true; SColumnDataAgg** pColAgg = NULL; tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg); @@ -261,7 +263,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; - SSDataBlock* pBlock = pTableScanInfo->pResBlock; + SSDataBlock* pBlock = pTableScanInfo->pResBlock; while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { if (isTaskKilled(pOperator->pTaskInfo)) { @@ -344,7 +346,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { pTableScanInfo->scanFlag = REPEAT_SCAN; qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64 - "-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); + "-%" PRId64, + GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); // do prepare for the next round table scan operation tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -373,22 +376,22 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon pInfo->cond = *pCond; pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]}; - pInfo->interval = *pInterval; - pInfo->sampleRatio = sampleRatio; + pInfo->interval = *pInterval; + pInfo->sampleRatio = sampleRatio; pInfo->dataBlockLoadFlag = dataLoadFlag; - pInfo->pResBlock = pResBlock; - pInfo->pFilterNode = pCondition; - pInfo->dataReader = pDataReader; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColMatchInfo; + pInfo->pResBlock = pResBlock; + pInfo->pFilterNode = pCondition; + pInfo->dataReader = pDataReader; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; - pOperator->name = "TableScanOperator"; // for dubug purpose - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = numOfOutput; - pOperator->pTaskInfo = pTaskInfo; + pOperator->name = "TableScanOperator"; // for dubug purpose + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfOutput; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL); @@ -404,17 +407,17 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pInfo->dataReader = pReadHandle; -// pInfo->prevGroupId = -1; + pInfo->dataReader = pReadHandle; + // pInfo->prevGroupId = -1; - pOperator->name = "TableSeqScanOperator"; + pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL); return pOperator; @@ -514,18 +517,18 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) { SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex); - TSKEY* ts = (TSKEY*)pColDataInfo->pData; + TSKEY* ts = (TSKEY*)pColDataInfo->pData; for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) { if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) { - taosArrayPush(pInfo->tsArray, ts+i); + taosArrayPush(pInfo->tsArray, ts + i); } } if (taosArrayGetSize(pInfo->tsArray) > 0) { - //TODO(liuyao) get from tsdb - // SSDataBlock* p = createOneDataBlock(pInfo->pRes, true); - // p->info.type = STREAM_INVERT; - // taosArrayClear(pInfo->tsArray); - // return p; + // TODO(liuyao) get from tsdb + // SSDataBlock* p = createOneDataBlock(pInfo->pRes, true); + // p->info.type = STREAM_INVERT; + // taosArrayClear(pInfo->tsArray); + // return p; return NULL; } return NULL; @@ -535,7 +538,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; - int32_t rows = 0; + int32_t rows = 0; pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { @@ -571,7 +574,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { int32_t numOfRows = 0; int16_t outputCol = 0; - int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol); + int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol); if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { pTaskInfo->code = code; @@ -652,8 +655,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); for (int32_t i = 0; i < numOfOutput; ++i) { - int16_t* id = taosArrayGet(pColList, i); - taosArrayPush(pColIds, id); + SColMatchInfo* id = taosArrayGet(pColList, i); + int16_t colId = id->colId; + taosArrayPush(pColIds, &colId); } pInfo->pColMatchInfo = pColList; @@ -678,8 +682,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* return NULL; } - pInfo->primaryTsIndex = 0; //TODO(liuyao) get it from physical plan - pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); //TODO(liuyao) get it from physical plan + pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan + pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); // TODO(liuyao) get it from physical plan if (pInfo->pUpdateInfo == NULL) { taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -687,25 +691,26 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* } pInfo->readerHandle = streamReadHandle; - pInfo->pRes = pResBlock; - pInfo->pCondition = pCondition; + pInfo->pRes = pResBlock; + pInfo->pCondition = pCondition; - pOperator->name = "StreamBlockScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pResBlock->info.numOfCols; - pOperator->fpSet._openFn = operatorDummyOpenFn; + pOperator->name = "StreamBlockScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pResBlock->info.numOfCols; + pOperator->fpSet._openFn = operatorDummyOpenFn; pOperator->fpSet.getNextFn = doStreamBlockScan; - pOperator->fpSet.closeFn = operatorDummyCloseFn; + pOperator->fpSet.closeFn = operatorDummyCloseFn; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); return pOperator; - _error: +_error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); return NULL; @@ -774,7 +779,7 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) { if (NULL == pCondition) { return; } - nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*) dbName); + nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName); } static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { @@ -809,7 +814,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { code = filterSetDataFromSlotId(filter, ¶m1); int8_t* rowRes = NULL; - bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); + bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); filterFreeInfo(filter); SSDataBlock* px = createOneDataBlock(pInfo->pRes, false); @@ -853,13 +858,13 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { static SSDataBlock* buildSysTableMetaBlock() { SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - size_t size = 0; - const SSysTableMeta *pMeta = NULL; + size_t size = 0; + const SSysTableMeta* pMeta = NULL; getInfosDbMeta(&pMeta, &size); int32_t index = 0; - for(int32_t i = 0; i < size; ++i) { - if(strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) { + for (int32_t i = 0; i < size; ++i) { + if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) { index = i; break; } @@ -867,7 +872,7 @@ static SSDataBlock* buildSysTableMetaBlock() { pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData)); - for(int32_t i = 0; i < pMeta[index].colNum; ++i) { + for (int32_t i = 0; i < pMeta[index].colNum; ++i) { SColumnInfoData colInfoData = {0}; colInfoData.info.colId = i + 1; colInfoData.info.type = pMeta[index].schema[i].type; @@ -1091,7 +1096,7 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { SSDataBlock* p = buildSysTableMetaBlock(); blockDataEnsureCapacity(p, capacity); - size_t size = 0; + size_t size = 0; const SSysTableMeta* pSysDbTableMeta = NULL; getInfosDbMeta(&pSysDbTableMeta, &size); @@ -1100,18 +1105,19 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { getPerfDbMeta(&pSysDbTableMeta, &size); p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB); - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock); -// blockDataDestroy(p); todo handle memory leak + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock); + // blockDataDestroy(p); todo handle memory leak pInfo->pRes->info.rows = p->info.rows; return p->info.rows; } -int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName) { - char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; +int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, + const char* dbName) { + char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; int32_t numOfRows = p->info.rows; - for(int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < size; ++i) { const SSysTableMeta* pm = &pSysDbTableMeta[i]; SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0); @@ -1132,7 +1138,7 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT pColInfoData = taosArrayGet(p->pDataBlock, 3); colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false); - for(int32_t j = 4; j <= 8; ++j) { + for (int32_t j = 4; j <= 8; ++j) { pColInfoData = taosArrayGet(p->pDataBlock, j); colDataAppendNULL(pColInfoData, numOfRows); } @@ -1160,18 +1166,18 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe return NULL; } - pInfo->accountId = accountId; + pInfo->accountId = accountId; pInfo->showRewrite = showRewrite; - pInfo->pRes = pResBlock; - pInfo->pCondition = pCondition; - pInfo->scanCols = colList; + pInfo->pRes = pResBlock; + pInfo->pCondition = pCondition; + pInfo->scanCols = colList; initResultSizeInfo(pOperator, 4096); tNameAssign(&pInfo->name, pName); const char* name = tNameGetTableName(&pInfo->name); if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { - pInfo->readHandle = *(SReadHandle*) readHandle; + pInfo->readHandle = *(SReadHandle*)readHandle; blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); } else { tsem_init(&pInfo->ready, 0, 0); @@ -1201,14 +1207,14 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe #endif } - pOperator->name = "SysTableScanOperator"; + pOperator->name = "SysTableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pResBlock->info.numOfCols; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, - NULL, NULL, NULL); + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pResBlock->info.numOfCols; + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -1355,26 +1361,27 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { } SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, - SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, SArray* pColMatchInfo, + STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pTableGroups = pTableGroupInfo; - pInfo->pColMatchInfo = pColMatchInfo; - pInfo->pRes = pResBlock; - pInfo->readHandle = *pReadHandle; - pInfo->curPos = 0; - pOperator->name = "TagScanOperator"; + pInfo->pTableGroups = pTableGroupInfo; + pInfo->pColMatchInfo = pColMatchInfo; + pInfo->pRes = pResBlock; + pInfo->readHandle = *pReadHandle; + pInfo->curPos = 0; + pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pExpr = pExpr; - pOperator->numOfExprs = numOfOutput; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExpr; + pOperator->numOfExprs = numOfOutput; + pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(pOperator, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index a78f818ded..3b49dca800 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -152,7 +152,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { - blockDebugShowData(pRes); + /*blockDebugShowData(pRes);*/ ASSERT(pTask->tbSink.pTSchema); SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid); tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); From d17b478e96afcf6838209b819215b2a1c0c77d22 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 8 May 2022 02:37:23 +0800 Subject: [PATCH 07/11] feat(stream): auto create ctb --- include/common/tdatablock.h | 2 +- include/libs/stream/tstream.h | 9 +- source/common/src/tdatablock.c | 18 ++- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 3 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 8 ++ source/dnode/vnode/src/tq/tq.c | 18 ++- source/libs/executor/src/timewindowoperator.c | 104 +++++++++--------- source/libs/stream/src/tstream.c | 6 +- 10 files changed, 99 insertions(+), 71 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 1677ec0d4f..cea40f4785 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -232,7 +232,7 @@ void blockDebugShowData(const SArray* dataBlocks); int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t uid, tb_uid_t suid); -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid); +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, int32_t vgId); static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 71462a86d4..e277622c40 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -25,6 +25,8 @@ extern "C" { #ifndef _TSTREAM_H_ #define _TSTREAM_H_ +typedef struct SStreamTask SStreamTask; + enum { STREAM_TASK_STATUS__RUNNING = 1, STREAM_TASK_STATUS__STOP, @@ -69,7 +71,7 @@ typedef struct { SUseDbRsp dbInfo; } STaskDispatcherShuffle; -typedef void FTbSink(void* vnode, int64_t ver, const SArray* data); +typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); typedef struct { int64_t stbUid; @@ -119,7 +121,7 @@ enum { TASK_SINK__FETCH, }; -typedef struct { +struct SStreamTask { int64_t streamId; int32_t taskId; int8_t status; @@ -154,8 +156,7 @@ typedef struct { // application storage void* ahandle; - -} SStreamTask; +}; SStreamTask* tNewSStreamTask(int64_t streamId); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 1c099afc93..0d0bbb07be 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1596,7 +1596,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks return TSDB_CODE_SUCCESS; } -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid) { +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, + int32_t vgId) { SSubmitReq* ret = NULL; // cal size @@ -1634,9 +1635,11 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo } // assign data - ret = taosMemoryCalloc(1, cap); + ret = taosMemoryCalloc(1, cap + 46); + ret = POINTER_SHIFT(ret, 46); + ret->header.vgId = vgId; ret->version = htonl(1); - ret->length = htonl(cap - sizeof(SSubmitReq)); + ret->length = sizeof(SSubmitReq); ret->numOfBlocks = htonl(sz); void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq)); @@ -1703,11 +1706,14 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo rowData = POINTER_SHIFT(rowData, rowLen); blkHead->dataLen += rowLen; } - int32_t len = blkHead->dataLen; - blkHead->dataLen = htonl(len); - blkHead = POINTER_SHIFT(blkHead, len); + int32_t dataLen = blkHead->dataLen; + blkHead->dataLen = htonl(dataLen); + + ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; + blkHead = POINTER_SHIFT(blkHead, schemaLen + dataLen); /*submitBlk = blkHead;*/ } + ret->length = htonl(ret->length); return ret; } diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 4b42f97f53..fa5e1d78a0 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -106,6 +106,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); // sync integration +int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9329e7b15d..8939aa23e0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -177,6 +177,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.pWrapper = pMgmt->pWrapper; + msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 3088c5dea4..17c7563885 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -135,6 +135,7 @@ static void *vmOpenVnodeFunc(void *param) { SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.pWrapper = pMgmt->pWrapper; + msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; @@ -147,7 +148,7 @@ static void *vmOpenVnodeFunc(void *param) { pThread->failed++; } else { vmOpenVnode(pMgmt, pCfg, pImpl); - //vnodeStart(pImpl); + // vnodeStart(pImpl); dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->opened++; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index f3d7253e71..06e24cb48a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -357,6 +357,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT pMsg->rpcMsg = *pRpc; // if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0); switch (qtype) { + case WRITE_QUEUE: + dTrace("msg:%p, will be put into vnode-write queue", pMsg); + taosWriteQitem(pVnode->pWriteQ, pMsg); + break; case QUERY_QUEUE: dTrace("msg:%p, will be put into vnode-query queue", pMsg); taosWriteQitem(pVnode->pQueryQ, pMsg); @@ -387,6 +391,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT return code; } +int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, WRITE_QUEUE); +} + int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e769108d59..b7378e44f2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -884,11 +884,21 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { } } -void tqTableSink(void* vnode, int64_t ver, const SArray* data) { - // - SVnode* pVnode = (SVnode*)vnode; +void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { + const SArray* pRes = (const SArray*)data; + SVnode* pVnode = (SVnode*)vnode; + + ASSERT(pTask->tbSink.pTSchema); + SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pVnode->config.vgId); + tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); // build write msg - // + SRpcMsg msg = { + .msgType = TDMT_VND_SUBMIT, + .pCont = pReq, + .contLen = ntohl(pReq->length), + }; + + ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0); } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 5c609303ff..ebde1c7997 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1,7 +1,7 @@ -#include "ttime.h" -#include "tdatablock.h" #include "executorimpl.h" #include "functionMgt.h" +#include "tdatablock.h" +#include "ttime.h" typedef enum SResultTsInterpType { RESULT_ROW_START_INTERP = 1, @@ -545,7 +545,6 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { } } - static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx, SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep, int32_t order, bool timeWindowInterpo) { @@ -759,10 +758,10 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SIntervalAggOperatorInfo* pInfo = pOperator->info; - int32_t order = TSDB_ORDER_ASC; + int32_t order = TSDB_ORDER_ASC; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -808,7 +807,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); int64_t gid = pBlock->info.groupId; @@ -932,7 +931,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -981,10 +980,10 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr } } static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) { - for ( int i = 0; i < num; i++) { + for (int i = 0; i < num; i++) { if (type == STREAM_INVERT) { fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet)); - } else if (type == STREAM_NORMAL){ + } else if (type == STREAM_NORMAL) { fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet)); } } @@ -992,7 +991,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; - int32_t order = TSDB_ORDER_ASC; + int32_t order = TSDB_ORDER_ASC; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -1038,7 +1037,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); - ASSERT(pInfo->binfo.pRes->info.rows > 0); + // TODO: remove for stream + /*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/ pOperator->status = OP_RES_TO_RETURN; return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; @@ -1070,17 +1070,17 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; // pInfo->execModel = OPTR_EXEC_MODEL_STREAM; - pInfo->execModel = pTaskInfo->execModel; - pInfo->win = pTaskInfo->window; - pInfo->twAggSup = *pTwAggSupp; + pInfo->execModel = pTaskInfo->execModel; + pInfo->win = pTaskInfo->window; + pInfo->twAggSup = *pTwAggSupp; pInfo->primaryTsIndex = primaryTsSlotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -1099,14 +1099,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); - pOperator->name = "TimeIntervalAggOperator"; + pOperator->name = "TimeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); @@ -1118,7 +1118,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* return pOperator; - _error: +_error: destroyIntervalOperatorInfo(pInfo, numOfCols); taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -1131,7 +1131,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } @@ -1177,7 +1177,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr return pOperator; - _error: +_error: destroyIntervalOperatorInfo(pInfo, numOfCols); taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -1321,7 +1321,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { return pSliceInfo->binfo.pRes; } - int32_t order = TSDB_ORDER_ASC; + int32_t order = TSDB_ORDER_ASC; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -1379,7 +1379,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; - _error: +_error: taosMemoryFree(pInfo); taosMemoryFree(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; @@ -1402,18 +1402,18 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - pInfo->twAggSup = *pTwAggSup; + pInfo->twAggSup = *pTwAggSup; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->tsSlotId = tsSlotId; - pOperator->name = "StateWindowOperator"; + pInfo->tsSlotId = tsSlotId; + pOperator->name = "StateWindowOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW; pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExpr; - pOperator->numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; - pOperator->info = pInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExpr; + pOperator->numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); @@ -1421,7 +1421,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; - _error: +_error: pTaskInfo->code = TSDB_CODE_SUCCESS; return NULL; } @@ -1432,8 +1432,8 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { } SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, - SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, + STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1453,18 +1453,18 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->tsSlotId = tsSlotId; - pInfo->gap = gap; - pInfo->binfo.pRes = pResBlock; - pInfo->winSup.prevTs = INT64_MIN; - pInfo->reptScan = false; - pOperator->name = "SessionWindowAggOperator"; + pInfo->tsSlotId = tsSlotId; + pInfo->gap = gap; + pInfo->binfo.pRes = pResBlock; + pInfo->winSup.prevTs = INT64_MIN; + pInfo->reptScan = false; + pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW; pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; - pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); @@ -1473,7 +1473,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo code = appendDownstream(pOperator, &downstream, 1); return pOperator; - _error: +_error: if (pInfo != NULL) { destroySWindowOperatorInfo(pInfo, numOfCols); } @@ -1482,4 +1482,4 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; -} \ No newline at end of file +} diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 3b49dca800..237f673a47 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -150,12 +150,12 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in pRes = (SArray*)input; } + if (pRes == NULL || taosArrayGetSize(pRes) == 0) return 0; + // sink if (pTask->sinkType == TASK_SINK__TABLE) { /*blockDebugShowData(pRes);*/ - ASSERT(pTask->tbSink.pTSchema); - SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid); - tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); + pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes); } else if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); // From b78918c72a5b98dbe87d95a85cb0723ad97a1db0 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 8 May 2022 03:16:00 +0800 Subject: [PATCH 08/11] test(stream): add stream case --- tests/script/jenkins/basic.txt | 3 ++ tests/script/tsim/tstream/basic0.sim | 71 ++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 tests/script/tsim/tstream/basic0.sim diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index b95e822df3..f4a7a1d822 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -65,6 +65,9 @@ ./test.sh -f tsim/tmq/basic3.sim ./test.sh -f tsim/tmq/basic4.sim +# ---- tstream +./test.sh -f tstream/basic0.sim + # --- stable ./test.sh -f tsim/stable/disk.sim ./test.sh -f tsim/stable/dnode3.sim diff --git a/tests/script/tsim/tstream/basic0.sim b/tests/script/tsim/tstream/basic0.sim new file mode 100644 index 0000000000..b3d51cdcc8 --- /dev/null +++ b/tests/script/tsim/tstream/basic0.sim @@ -0,0 +1,71 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database d0 vgroups 1 +sql show databases +if $rows != 3 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use d0 + +print =============== create super table, include column type for count/sum/min/max/first +sql create table if not exists stb (ts timestamp, k int) tags (a int) + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +sql create table ct1 using stb tags(1000) +sql create table ct2 using stb tags(2000) +sql create table ct3 using stb tags(3000) + +sql show tables +if $rows != 3 then + return -1 +endi + +sql create stream s1 into outstb as select _wstartts, min(k), max(k), sum(k) from ct1 interval(10m) + +sql show stables +if $rows != 2 then + return -1 +endi + +print =============== insert data + +sql insert into ct1 values(now+0s, 234) + +sleep 100 + +#=================================================================== +#=================================================================== +print =============== query data from child table +sql select `_wstartts`,`min(k)`,`max(k)`,`sum(k)` from outstb +print rows: $rows +print $data00 $data01 $data02 $data03 +if $rows != 1 then + return -1 +endi + +if $data01 != 234 then + return -1 +endi + +if $data02 != 234 then + return -1 +endi + +if $data03 != 234 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From 72437147d9c01611cee7bef2be7d727f517b7bb3 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 8 May 2022 03:40:52 +0800 Subject: [PATCH 09/11] test(stream): add stream case --- tests/script/jenkins/basic.txt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index f4a7a1d822..8cce169d99 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -59,15 +59,16 @@ # ---- table ./test.sh -f tsim/table/basic1.sim +# ---- tstream +./test.sh -f tsim/tstream/basic0.sim + + # ---- tmq ./test.sh -f tsim/tmq/basic1.sim ./test.sh -f tsim/tmq/basic2.sim ./test.sh -f tsim/tmq/basic3.sim ./test.sh -f tsim/tmq/basic4.sim -# ---- tstream -./test.sh -f tstream/basic0.sim - # --- stable ./test.sh -f tsim/stable/disk.sim ./test.sh -f tsim/stable/dnode3.sim From 597f42aa5e43f9259ebe75c1d24fed192718c435 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 8 May 2022 03:19:57 +0000 Subject: [PATCH 10/11] fix auto-create table req --- source/dnode/vnode/src/meta/metaTable.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 3c140cc668..7663047429 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -141,16 +141,19 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { goto _err; } - // preprocess req - pReq->uid = tGenIdPI64(); - pReq->ctime = taosGetTimestampMs(); - // validate req metaReaderInit(&mr, pMeta, 0); if (metaGetTableEntryByName(&mr, pReq->name) == 0) { + pReq->uid = mr.me.uid; + if (pReq->type == TSDB_CHILD_TABLE) { + pReq->ctb.suid = mr.me.ctbEntry.suid; + } terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST; metaReaderClear(&mr); return -1; + } else { + pReq->uid = tGenIdPI64(); + pReq->ctime = taosGetTimestampMs(); } metaReaderClear(&mr); From 68b9424f8a1e014965d64d4e7a7becd60eb0bb32 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 8 May 2022 11:58:19 +0800 Subject: [PATCH 11/11] test(stream): more stream case --- source/dnode/vnode/src/tq/tq.c | 2 +- tests/script/tsim/tstream/basic0.sim | 78 ++++++++++++++++++++++++++-- 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b7378e44f2..b8d6e84b1c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -890,7 +890,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ASSERT(pTask->tbSink.pTSchema); SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pVnode->config.vgId); - tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); + /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/ // build write msg SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, diff --git a/tests/script/tsim/tstream/basic0.sim b/tests/script/tsim/tstream/basic0.sim index b3d51cdcc8..2a1bd14531 100644 --- a/tests/script/tsim/tstream/basic0.sim +++ b/tests/script/tsim/tstream/basic0.sim @@ -33,7 +33,7 @@ if $rows != 3 then return -1 endi -sql create stream s1 into outstb as select _wstartts, min(k), max(k), sum(k) from ct1 interval(10m) +sql create stream s1 into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m) sql show stables if $rows != 2 then @@ -42,14 +42,13 @@ endi print =============== insert data -sql insert into ct1 values(now+0s, 234) - +sql insert into ct1 values('2022-05-08 03:42:00.000', 234) sleep 100 -#=================================================================== #=================================================================== print =============== query data from child table -sql select `_wstartts`,`min(k)`,`max(k)`,`sum(k)` from outstb + +sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb print rows: $rows print $data00 $data01 $data02 $data03 if $rows != 1 then @@ -68,4 +67,73 @@ if $data03 != 234 then return -1 endi +#=================================================================== +print =============== insert data + +sql insert into ct1 values('2022-05-08 03:43:00.000', -111) +sleep 100 + +#=================================================================== +print =============== query data from child table + +sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb +print rows: $rows +print $data00 $data01 $data02 $data03 +if $rows != 1 then + return -1 +endi + +if $data01 != -111 then + return -1 +endi + +if $data02 != 234 then + return -1 +endi + +if $data03 != 123 then + return -1 +endi + +#=================================================================== +print =============== insert data + +sql insert into ct1 values('2022-05-08 03:53:00.000', 789) +sleep 100 + +#=================================================================== +print =============== query data from child table + +sql select `_wstartts`,`min(k)`,`max(k)`,sum_alias from outstb +print rows: $rows +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +if $rows != 2 then + return -1 +endi + +if $data01 != -111 then + return -1 +endi + +if $data02 != 234 then + return -1 +endi + +if $data03 != 123 then + return -1 +endi + +if $data11 != 789 then + return -1 +endi + +if $data12 != 789 then + return -1 +endi + +if $data13 != 789 then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT