From 2f9aeeb5fc1b821276e3cc8adb656e6fa9d34dfd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 9 Oct 2022 16:54:27 +0800 Subject: [PATCH] feat(stream): support define table name --- include/common/tcommon.h | 4 +- include/common/tdatablock.h | 3 +- include/common/tmsg.h | 10 +- include/libs/executor/executor.h | 9 +- source/client/src/clientRawBlockWrite.c | 69 +++--- source/common/src/tdatablock.c | 37 ++++ source/common/src/tmsg.c | 4 +- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 16 +- source/dnode/vnode/src/meta/metaSnapshot.c | 200 +++++++++--------- source/dnode/vnode/src/meta/metaTable.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 5 +- source/dnode/vnode/src/tq/tqSink.c | 69 +++--- source/libs/executor/inc/executil.h | 4 +- source/libs/executor/inc/executorimpl.h | 6 +- source/libs/executor/src/executil.c | 36 ++-- source/libs/executor/src/executor.c | 60 +----- source/libs/executor/src/executorimpl.c | 8 +- source/libs/executor/src/groupoperator.c | 35 +++ source/libs/executor/src/scanoperator.c | 22 +- source/libs/executor/src/timewindowoperator.c | 19 +- source/libs/parser/src/parInsert.c | 8 +- source/libs/parser/src/parTranslater.c | 8 +- source/libs/stream/src/streamData.c | 1 + source/libs/stream/src/streamDispatch.c | 9 +- source/libs/stream/src/streamState.c | 1 + 26 files changed, 357 insertions(+), 289 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2add3332ab..670ce29a2d 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -157,7 +157,7 @@ typedef struct SDataBlockInfo { int32_t rowSize; uint64_t uid; // the uid of table, from which current data block comes uint16_t blockId; // block id, generated by physical planner - uint64_t groupId; // no need to serialize + uint64_t groupId; int16_t hasVarCol; uint32_t capacity; // TODO: optimize and remove following @@ -166,6 +166,8 @@ typedef struct SDataBlockInfo { EStreamType type; // used for stream, do not serialize STimeWindow calWin; // used for stream, do not serialize TSKEY watermark; // used for stream + + char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream } SDataBlockInfo; typedef struct SSDataBlock { diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 73d043b2d0..24dfa5958d 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -235,7 +235,8 @@ void blockDataFreeRes(SSDataBlock* pBlock); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); SSDataBlock* createSpecialDataBlock(EStreamType type); -int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); +SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx); +int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7aec00c7c1..9f21ee007f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -676,7 +676,6 @@ typedef struct { col_id_t colId; int16_t slotId; }; - bool output; // TODO remove it later int8_t type; int32_t bytes; @@ -1395,8 +1394,9 @@ typedef struct { int32_t numOfCols; int64_t skey; int64_t ekey; - int64_t version; // for stream - TSKEY watermark; // for stream + int64_t version; // for stream + TSKEY watermark; // for stream + char parTbName[TSDB_TABLE_NAME_LEN]; // for stream char data[]; } SRetrieveTableRsp; @@ -2025,7 +2025,7 @@ typedef struct SVCreateTbReq { int8_t type; union { struct { - char* name; // super table name + char* stbName; // super table name uint8_t tagNum; tb_uid_t suid; SArray* tagName; @@ -2045,7 +2045,7 @@ static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) { taosMemoryFreeClear(req->comment); if (req->type == TSDB_CHILD_TABLE) { taosMemoryFreeClear(req->ctb.pTag); - taosMemoryFreeClear(req->ctb.name); + taosMemoryFreeClear(req->ctb.stbName); taosArrayDestroy(req->ctb.tagName); req->ctb.tagName = NULL; } else if (req->type == TSDB_NORMAL_TABLE) { diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 78eedaf921..b4b60f804d 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -89,13 +89,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n */ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); -/** - * @brief Cleanup SSDataBlock for StreamScanInfo - * - * @param tinfo - */ -void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo); - /** * Update the table id list, add or remove. * @@ -137,7 +130,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @return */ -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch *pLocal); +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); /** diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index eb7b45cc05..a257335931 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -225,10 +225,10 @@ _err: return string; } -static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq){ - STag* pTag = (STag*)pCreateReq->ctb.pTag; - char* sname = pCreateReq->ctb.name; - char* name = pCreateReq->name; +static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { + STag* pTag = (STag*)pCreateReq->ctb.pTag; + char* sname = pCreateReq->ctb.stbName; + char* name = pCreateReq->name; SArray* tagName = pCreateReq->ctb.tagName; int64_t id = pCreateReq->uid; uint8_t tagNum = pCreateReq->ctb.tagNum; @@ -302,14 +302,14 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq){ cJSON_AddItemToArray(tags, tag); } - end: +end: cJSON_AddItemToObject(json, "tags", tags); taosArrayDestroy(pTagVals); } static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { - char* string = NULL; - cJSON* json = cJSON_CreateObject(); + char* string = NULL; + cJSON* json = cJSON_CreateObject(); if (json == NULL) { return NULL; } @@ -325,7 +325,7 @@ static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { buildChildElement(json, pCreateReq); cJSON* createList = cJSON_CreateArray(); - for(int i = 0; nReqs > 1 && i < nReqs; i++){ + for (int i = 0; nReqs > 1 && i < nReqs; i++) { cJSON* create = cJSON_CreateObject(); buildChildElement(create, pCreateReq + i); cJSON_AddItemToArray(createList, create); @@ -355,7 +355,8 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { if (pCreateReq->type == TSDB_CHILD_TABLE) { string = buildCreateCTableJson(req.pReqs, req.nReqs); } else if (pCreateReq->type == TSDB_NORMAL_TABLE) { - string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); + string = + buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); } } @@ -374,15 +375,15 @@ _exit: static char* processAutoCreateTable(STaosxRsp* rsp) { ASSERT(rsp->createTableNum != 0); - SDecoder* decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder)); - SVCreateTbReq* pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq)); - char* string = NULL; + SDecoder* decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder)); + SVCreateTbReq* pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq)); + char* string = NULL; // loop to create table for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) { // decode void** data = taosArrayGet(rsp->createTableReq, iReq); - int32_t *len = taosArrayGet(rsp->createTableLen, iReq); + int32_t* len = taosArrayGet(rsp->createTableLen, iReq); tDecoderInit(&decoder[iReq], *data, *len); if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) { goto _exit; @@ -393,7 +394,7 @@ static char* processAutoCreateTable(STaosxRsp* rsp) { string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); _exit: - for(int i = 0; i < rsp->createTableNum; i++){ + for (int i = 0; i < rsp->createTableNum; i++) { tDecoderClear(&decoder[i]); taosMemoryFreeClear(pCreateReq[i].comment); if (pCreateReq[i].type == TSDB_CHILD_TABLE) { @@ -828,10 +829,10 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { if (pCreateReq->type == TSDB_CHILD_TABLE) { STableMeta* pTableMeta = NULL; SName sName = {0}; - toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.name, &sName); + toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName); code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta); if (code != TSDB_CODE_SUCCESS) { - uError("taosCreateTable:catalogGetTableMeta failed. table name: %s", pCreateReq->ctb.name); + uError("taosCreateTable:catalogGetTableMeta failed. table name: %s", pCreateReq->ctb.stbName); goto end; } @@ -1661,12 +1662,12 @@ end: } static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { - int32_t code = TSDB_CODE_SUCCESS; - SHashObj* pVgHash = NULL; - SQuery* pQuery = NULL; - SMqTaosxRspObj rspObj = {0}; - SDecoder decoder = {0}; - STableMeta* pTableMeta = NULL; + int32_t code = TSDB_CODE_SUCCESS; + SHashObj* pVgHash = NULL; + SQuery* pQuery = NULL; + SMqTaosxRspObj rspObj = {0}; + SDecoder decoder = {0}; + STableMeta* pTableMeta = NULL; terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); @@ -1745,13 +1746,13 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) // find schema data info int32_t schemaLen = 0; - void* schemaData = NULL; - for(int j = 0; j < rspObj.rsp.createTableNum; j++){ - void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j); + void* schemaData = NULL; + for (int j = 0; j < rspObj.rsp.createTableNum; j++) { + void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j); int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j); - SDecoder decoderTmp = {0}; - SVCreateTbReq pCreateReq = {0}; + SDecoder decoderTmp = {0}; + SVCreateTbReq pCreateReq = {0}; tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { @@ -1761,11 +1762,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } - ASSERT (pCreateReq.type == TSDB_CHILD_TABLE); - if(strcmp(tbName, pCreateReq.name) == 0){ + ASSERT(pCreateReq.type == TSDB_CHILD_TABLE); + if (strcmp(tbName, pCreateReq.name) == 0) { schemaLen = *lenTmp; schemaData = *dataTmp; - strcpy(pName.tname, pCreateReq.ctb.name); + strcpy(pName.tname, pCreateReq.ctb.stbName); tDecoderClear(&decoderTmp); taosMemoryFreeClear(pCreateReq.comment); taosArrayDestroy(pCreateReq.ctb.tagName); @@ -1843,8 +1844,8 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) uint64_t uid = pTableMeta->uid; int16_t sver = pTableMeta->sversion; - void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); - if(schemaData){ + void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); + if (schemaData) { memcpy(blkSchema, schemaData, schemaLen); } STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); @@ -1952,7 +1953,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; - end: +end: tDeleteSTaosxRsp(&rspObj.rsp); rspObj.resInfo.pRspMsg = NULL; doFreeReqResultInfo(&rspObj.resInfo); @@ -1969,7 +1970,7 @@ char* tmq_get_json_meta(TAOS_RES* res) { return NULL; } - if(TD_RES_TMQ_METADATA(res)){ + if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj* pMetaDataRspObj = (SMqTaosxRspObj*)res; return processAutoCreateTable(&pMetaDataRspObj->rsp); } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 23cc868658..c525759ac7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1346,6 +1346,43 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) { return pBlock; } +SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) { + if (pDataBlock == NULL) { + return NULL; + } + + SSDataBlock* pBlock = createDataBlock(); + pBlock->info = pDataBlock->info; + pBlock->info.rows = 0; + pBlock->info.capacity = 0; + + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; + blockDataAppendColInfo(pBlock, &colInfo); + } + + int32_t code = blockDataEnsureCapacity(pBlock, 1); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + blockDataDestroy(pBlock); + return NULL; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); + void* pData = colDataGetData(pSrc, rowIdx); + bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL); + colDataAppend(pDst, 0, pData, isNull); + } + + pBlock->info.rows = 1; + + return pBlock; +} + SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { if (pDataBlock == NULL) { return NULL; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f4ffc4c996..55e5616dd5 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5075,7 +5075,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { } if (pReq->type == TSDB_CHILD_TABLE) { - if (tEncodeCStr(pCoder, pReq->ctb.name) < 0) return -1; + if (tEncodeCStr(pCoder, pReq->ctb.stbName) < 0) return -1; if (tEncodeU8(pCoder, pReq->ctb.tagNum) < 0) return -1; if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1; if (tEncodeTag(pCoder, (const STag *)pReq->ctb.pTag) < 0) return -1; @@ -5112,7 +5112,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { } if (pReq->type == TSDB_CHILD_TABLE) { - if (tDecodeCStr(pCoder, &pReq->ctb.name) < 0) return -1; + if (tDecodeCStr(pCoder, &pReq->ctb.stbName) < 0) return -1; if (tDecodeU8(pCoder, &pReq->ctb.tagNum) < 0) return -1; if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1; if (tDecodeTag(pCoder, (STag **)&pReq->ctb.pTag) < 0) return -1; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c3d03a6c5e..9cc920de04 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -639,6 +639,7 @@ typedef struct { char* physicalPlan; SArray* tasks; // SArray> SSchemaWrapper outputSchema; + SSchemaWrapper tagSchema; } SStreamObj; int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ea889e7001..eb6730e217 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -343,6 +343,20 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, goto FAIL; } + pObj->tagSchema.nCols = pCreate->numOfTags; + if (pCreate->numOfTags) { + pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema)); + } + ASSERT(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags)); + for (int32_t i = 0; i < pCreate->numOfTags; i++) { + SField *pField = taosArrayGet(pCreate->pTags, i); + pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1; + pObj->tagSchema.pSchema[i].bytes = pField->bytes; + pObj->tagSchema.pSchema[i].flags = pField->flags; + pObj->tagSchema.pSchema[i].type = pField->type; + memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN); + } + FAIL: if (pAst != NULL) nodesDestroyNode(pAst); if (pPlan != NULL) qDestroyQueryPlan(pPlan); @@ -673,7 +687,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); goto _OVER; } - mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); // hack way + mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); // hack way mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); // create stb for stream diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 9fdbe50f88..a40bbd7d87 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -196,11 +196,11 @@ _err: return code; } -typedef struct STableInfoForChildTable{ - char *tableName; - SSchemaWrapper *schemaRow; - SSchemaWrapper *tagRow; -}STableInfoForChildTable; +typedef struct STableInfoForChildTable { + char* tableName; + SSchemaWrapper* schemaRow; + SSchemaWrapper* tagRow; +} STableInfoForChildTable; static void destroySTableInfoForChildTable(void* data) { STableInfoForChildTable* pData = (STableInfoForChildTable*)data; @@ -209,35 +209,35 @@ static void destroySTableInfoForChildTable(void* data) { tDeleteSSchemaWrapper(pData->tagRow); } -static void MoveToSnapShotVersion(SSnapContext* ctx){ +static void MoveToSnapShotVersion(SSnapContext* ctx) { tdbTbcClose(ctx->pCur); tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL); STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX}; - int c = 0; + int c = 0; tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c); - if(c < 0){ + if (c < 0) { tdbTbcMoveToPrev(ctx->pCur); } } -static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid){ +static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) { tdbTbcClose(ctx->pCur); tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL); STbDbKey key = {.version = ver, .uid = uid}; - int c = 0; + int c = 0; tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c); return c; } -static void MoveToFirst(SSnapContext* ctx){ +static void MoveToFirst(SSnapContext* ctx) { tdbTbcClose(ctx->pCur); tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL); tdbTbcMoveToFirst(ctx->pCur); } -static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){ +static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) { STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t)); - if(data){ + if (data) { return; } STableInfoForChildTable dataTmp = {0}; @@ -248,9 +248,10 @@ static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){ taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); } -int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext** ctxRet){ +int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, + SSnapContext** ctxRet) { SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext)); - if(ctx == NULL) return -1; + if (ctx == NULL) return -1; *ctxRet = ctx; ctx->pMeta = pMeta; ctx->snapVersion = snapVersion; @@ -259,36 +260,37 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t ctx->queryMetaOrData = withMeta; ctx->withMeta = withMeta; ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if(ctx->idVersion == NULL){ + if (ctx->idVersion == NULL) { return -1; } ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if(ctx->suidInfo == NULL){ + if (ctx->suidInfo == NULL) { return -1; } taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable); ctx->index = 0; ctx->idList = taosArrayInit(100, sizeof(int64_t)); - void *pKey = NULL; - void *pVal = NULL; + void* pKey = NULL; + void* pVal = NULL; int vLen = 0, kLen = 0; metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion); MoveToFirst(ctx); - while(1){ + while (1) { int32_t ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen); if (ret < 0) break; - STbDbKey *tmp = (STbDbKey*)pKey; + STbDbKey* tmp = (STbDbKey*)pKey; if (tmp->version > ctx->snapVersion) break; SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t)); - if(idData) { + if (idData) { continue; } - if (tdbTbGet(pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) < 0) { // check if table exist for now, need optimize later + if (tdbTbGet(pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) < + 0) { // check if table exist for now, need optimize later continue; } @@ -296,9 +298,9 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t SMetaEntry me = {0}; tDecoderInit(&dc, pVal, vLen); metaDecodeEntry(&dc, &me); - if(ctx->subType == TOPIC_SUB_TYPE__TABLE){ + if (ctx->subType == TOPIC_SUB_TYPE__TABLE) { if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) || - (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)){ + (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) { tDecoderClear(&dc); continue; } @@ -314,13 +316,13 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t taosHashClear(ctx->idVersion); MoveToSnapShotVersion(ctx); - while(1){ + while (1) { int32_t ret = tdbTbcPrev(ctx->pCur, &pKey, &kLen, &pVal, &vLen); if (ret < 0) break; - STbDbKey *tmp = (STbDbKey*)pKey; - SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t)); - if(idData){ + STbDbKey* tmp = (STbDbKey*)pKey; + SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t)); + if (idData) { continue; } SIdInfo info = {.version = tmp->version, .index = 0}; @@ -330,27 +332,28 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t SMetaEntry me = {0}; tDecoderInit(&dc, pVal, vLen); metaDecodeEntry(&dc, &me); - if(ctx->subType == TOPIC_SUB_TYPE__TABLE){ + if (ctx->subType == TOPIC_SUB_TYPE__TABLE) { if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) || - (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)){ + (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) { tDecoderClear(&dc); continue; } } - if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) - || (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) { + if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) || + (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) { saveSuperTableInfoForChildTable(&me, ctx->suidInfo); } tDecoderClear(&dc); } - for(int i = 0; i < taosArrayGetSize(ctx->idList); i++){ - int64_t *uid = taosArrayGet(ctx->idList, i); + for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) { + int64_t* uid = taosArrayGet(ctx->idList, i); SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t)); ASSERT(idData); idData->index = i; - metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, idData->index); + metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, + idData->index); } tdbFree(pKey); @@ -358,7 +361,7 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t return TDB_CODE_SUCCESS; } -int32_t destroySnapContext(SSnapContext* ctx){ +int32_t destroySnapContext(SSnapContext* ctx) { tdbTbcClose(ctx->pCur); taosArrayDestroy(ctx->idList); taosHashCleanup(ctx->idVersion); @@ -367,12 +370,12 @@ int32_t destroySnapContext(SSnapContext* ctx){ return 0; } -static int32_t buildNormalChildTableInfo(SVCreateTbReq *req, void **pBuf, int32_t *contLen){ - int32_t ret = 0; +static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) { + int32_t ret = 0; SVCreateTbBatchReq reqs = {0}; reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq)); - if (NULL == reqs.pArray){ + if (NULL == reqs.pArray) { ret = -1; goto end; } @@ -380,7 +383,7 @@ static int32_t buildNormalChildTableInfo(SVCreateTbReq *req, void **pBuf, int32_ reqs.nReqs = 1; tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret); - if(ret < 0){ + if (ret < 0) { ret = -1; goto end; } @@ -405,7 +408,7 @@ end: return ret; } -static int32_t buildSuperTableInfo(SVCreateStbReq *req, void **pBuf, int32_t *contLen){ +static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) { int32_t ret = 0; tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret); if (ret < 0) { @@ -418,7 +421,7 @@ static int32_t buildSuperTableInfo(SVCreateStbReq *req, void **pBuf, int32_t *co return -1; } - SEncoder encoder = {0}; + SEncoder encoder = {0}; tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen); if (tEncodeSVCreateStbReq(&encoder, req) < 0) { taosMemoryFreeClear(*pBuf); @@ -429,16 +432,16 @@ static int32_t buildSuperTableInfo(SVCreateStbReq *req, void **pBuf, int32_t *co return 0; } -int32_t setForSnapShot(SSnapContext* ctx, int64_t uid){ +int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) { int c = 0; - if(uid == 0){ + if (uid == 0) { ctx->index = 0; return c; } SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t)); - if(!idInfo){ + if (!idInfo) { return -1; } @@ -447,17 +450,17 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid){ return c; } -int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid){ +int32_t getMetafromSnapShot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) { int32_t ret = 0; - void *pKey = NULL; - void *pVal = NULL; - int vLen = 0, kLen = 0; + void* pKey = NULL; + void* pVal = NULL; + int vLen = 0, kLen = 0; - while(1){ - if(ctx->index >= taosArrayGetSize(ctx->idList)){ + while (1) { + if (ctx->index >= taosArrayGetSize(ctx->idList)) { metaDebug("tmqsnap get meta end"); ctx->index = 0; - ctx->queryMetaOrData = false; // change to get data + ctx->queryMetaOrData = false; // change to get data return 0; } @@ -468,7 +471,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in *uid = *uidTmp; ret = MoveToPosition(ctx, idInfo->version, *uidTmp); - if(ret == 0){ + if (ret == 0) { break; } metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version); @@ -479,10 +482,10 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in SMetaEntry me = {0}; tDecoderInit(&dc, pVal, vLen); metaDecodeEntry(&dc, &me); - metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index-1); + metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1); - if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) - || (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) { + if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) || + (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) { SVCreateStbReq req = {0}; req.name = me.name; req.suid = me.uid; @@ -494,9 +497,10 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in ret = buildSuperTableInfo(&req, pBuf, contLen); *type = TDMT_VND_CREATE_STB; - } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) - || (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) { - STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); + } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) || + (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) { + STableInfoForChildTable* data = + (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); ASSERT(data); SVCreateTbReq req = {0}; @@ -506,16 +510,16 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in req.commentLen = -1; req.ctb.suid = me.ctbEntry.suid; req.ctb.tagNum = data->tagRow->nCols; - req.ctb.name = data->tableName; + req.ctb.stbName = data->tableName; SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN); - STag* p = (STag*)me.ctbEntry.pTags; - if(tTagIsJson(p)){ + STag* p = (STag*)me.ctbEntry.pTags; + if (tTagIsJson(p)) { if (p->nTag != 0) { SSchema* schema = &data->tagRow->pSchema[0]; taosArrayPush(tagName, schema->name); } - }else{ + } else { SArray* pTagVals = NULL; if (tTagToValArray((const STag*)p, &pTagVals) != 0) { ASSERT(0); @@ -523,36 +527,36 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in int16_t nCols = taosArrayGetSize(pTagVals); for (int j = 0; j < nCols; ++j) { STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j); - for(int i = 0; i < data->tagRow->nCols; i++){ - SSchema *schema = &data->tagRow->pSchema[i]; - if(schema->colId == pTagVal->cid){ + for (int i = 0; i < data->tagRow->nCols; i++) { + SSchema* schema = &data->tagRow->pSchema[i]; + if (schema->colId == pTagVal->cid) { taosArrayPush(tagName, schema->name); } } } taosArrayDestroy(pTagVals); } -// SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t)); -// if(sidInfo->version >= idInfo->version){ -// // need parse tag -// STag* p = (STag*)me.ctbEntry.pTags; -// SArray* pTagVals = NULL; -// if (tTagToValArray((const STag*)p, &pTagVals) != 0) { -// } -// -// int16_t nCols = taosArrayGetSize(pTagVals); -// for (int j = 0; j < nCols; ++j) { -// STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j); -// } -// }else{ - req.ctb.pTag = me.ctbEntry.pTags; -// } + // SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t)); + // if(sidInfo->version >= idInfo->version){ + // // need parse tag + // STag* p = (STag*)me.ctbEntry.pTags; + // SArray* pTagVals = NULL; + // if (tTagToValArray((const STag*)p, &pTagVals) != 0) { + // } + // + // int16_t nCols = taosArrayGetSize(pTagVals); + // for (int j = 0; j < nCols; ++j) { + // STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j); + // } + // }else{ + req.ctb.pTag = me.ctbEntry.pTags; + // } req.ctb.tagName = tagName; ret = buildNormalChildTableInfo(&req, pBuf, contLen); *type = TDMT_VND_CREATE_TABLE; taosArrayDestroy(tagName); - } else if(ctx->subType == TOPIC_SUB_TYPE__DB){ + } else if (ctx->subType == TOPIC_SUB_TYPE__DB) { SVCreateTbReq req = {0}; req.type = TSDB_NORMAL_TABLE; req.name = me.name; @@ -561,7 +565,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in req.ntb.schemaRow = me.ntbEntry.schemaRow; ret = buildNormalChildTableInfo(&req, pBuf, contLen); *type = TDMT_VND_CREATE_TABLE; - } else{ + } else { ASSERT(0); } tDecoderClear(&dc); @@ -569,14 +573,14 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in return ret; } -SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx){ +SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx) { SMetaTableInfo result = {0}; - void *pKey = NULL; - void *pVal = NULL; - int vLen, kLen; + void* pKey = NULL; + void* pVal = NULL; + int vLen, kLen; - while(1){ - if(ctx->index >= taosArrayGetSize(ctx->idList)){ + while (1) { + if (ctx->index >= taosArrayGetSize(ctx->idList)) { metaDebug("tmqsnap get uid info end"); return result; } @@ -586,7 +590,7 @@ SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx){ ASSERT(idInfo); int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp); - if(ret != 0) { + if (ret != 0) { metaDebug("tmqsnap getUidfromSnapShot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp, idInfo->version); continue; } @@ -595,10 +599,11 @@ SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx){ SMetaEntry me = {0}; tDecoderInit(&dc, pVal, vLen); metaDecodeEntry(&dc, &me); - metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index-1); + metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1); - if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE){ - STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); + if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) { + STableInfoForChildTable* data = + (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); result.uid = me.uid; result.suid = me.ctbEntry.suid; result.schema = tCloneSSchemaWrapper(data->schemaRow); @@ -612,15 +617,16 @@ SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx){ result.schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow); tDecoderClear(&dc); break; - } else if(ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) { - STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); + } else if (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) { + STableInfoForChildTable* data = + (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); result.uid = me.uid; result.suid = me.ctbEntry.suid; strcpy(result.tbName, me.name); result.schema = tCloneSSchemaWrapper(data->schemaRow); tDecoderClear(&dc); break; - } else{ + } else { metaDebug("tmqsnap get uid continue"); tDecoderClear(&dc); continue; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 932afe8937..361e453303 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -382,7 +382,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe } if (pReq->type == TSDB_CHILD_TABLE) { - tb_uid_t suid = metaGetTableEntryUidByName(pMeta, pReq->ctb.name); + tb_uid_t suid = metaGetTableEntryUidByName(pMeta, pReq->ctb.stbName); if (suid != pReq->ctb.suid) { terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; return -1; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 27da9da02c..1924f803d6 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -326,7 +326,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; return TSDB_CODE_FAILED; } - SReadHandle handle = { .meta = pVnode->pMeta, @@ -692,7 +691,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma while (1) { uint64_t ts; - bool hasMore = false; + bool hasMore = false; int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL); if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { @@ -1821,11 +1820,9 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { goto _err; } if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { - tdCleanupStreamInputDataBlock(taskInfo); goto _err; } - tdCleanupStreamInputDataBlock(taskInfo); smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch finished", SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay); } else { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 16c2a5d033..6e7428b662 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -48,8 +48,9 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl return 0; } -SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, - int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) { +SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, + SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, + SBatchDeleteReq* pDeleteReq) { SSubmitReq* ret = NULL; SArray* schemaReqs = NULL; SArray* schemaReqSz = NULL; @@ -89,40 +90,46 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem return NULL; } - SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; strcpy(tagNameStr, "group_id"); taosArrayPush(tagName, tagNameStr); -// STag* pTag = NULL; -// taosArrayClear(tagArray); -// SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); -// for(int j = 0; j < pTagSchemaWrapper->nCols; j++){ -// STagVal tagVal = { -// .cid = pTagSchemaWrapper->pSchema[j].colId, -// .type = pTagSchemaWrapper->pSchema[j].type, -// .i64 = (int64_t)pDataBlock->info.groupId, -// }; -// taosArrayPush(tagArray, &tagVal); -// taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name); -// } -// -// tTagNew(tagArray, 1, false, &pTag); -// if (pTag == NULL) { -// terrno = TSDB_CODE_OUT_OF_MEMORY; -// taosArrayDestroy(tagArray); -// taosArrayDestroy(tagName); -// return NULL; -// } + // STag* pTag = NULL; + // taosArrayClear(tagArray); + // SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + // for(int j = 0; j < pTagSchemaWrapper->nCols; j++){ + // STagVal tagVal = { + // .cid = pTagSchemaWrapper->pSchema[j].colId, + // .type = pTagSchemaWrapper->pSchema[j].type, + // .i64 = (int64_t)pDataBlock->info.groupId, + // }; + // taosArrayPush(tagArray, &tagVal); + // taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name); + // } + // + // tTagNew(tagArray, 1, false, &pTag); + // if (pTag == NULL) { + // terrno = TSDB_CODE_OUT_OF_MEMORY; + // taosArrayDestroy(tagArray); + // taosArrayDestroy(tagName); + // return NULL; + // } SVCreateTbReq createTbReq = {0}; - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - - createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); - createTbReq.ctb.name = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); createTbReq.flags = 0; createTbReq.type = TSDB_CHILD_TABLE; + + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); + + if (pDataBlock->info.parTbName[0]) { + createTbReq.name = strdup(pDataBlock->info.parTbName); + } else { + createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); + } + createTbReq.ctb.suid = suid; createTbReq.ctb.pTag = (uint8_t*)pTag; createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); @@ -261,8 +268,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ASSERT(pTask->tbSink.pTSchema); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true, pTask->tbSink.stbUid, - pTask->tbSink.stbFullName, &deleteReq); + SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true, + pTask->tbSink.stbUid, pTask->tbSink.stbFullName, &deleteReq); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 9dba29811e..4e960afdb1 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -74,7 +74,8 @@ typedef struct SResultRowPosition { typedef struct SResKeyPos { SResultRowPosition pos; uint64_t groupId; - char key[]; + // char parTbName[TSDB_TABLE_NAME_LEN]; + char key[]; } SResKeyPos; typedef struct SResultRowInfo { @@ -123,6 +124,7 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList); SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type); +void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 897015c4d3..4edb4a1f97 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -460,6 +460,8 @@ typedef struct SPartitionBySupporter { typedef struct SPartitionDataInfo { uint64_t groupId; + char* tbname; + SArray* tags; SArray* rowIds; } SPartitionDataInfo; @@ -617,6 +619,7 @@ typedef struct SStreamIntervalOperatorInfo { SArray* pChildren; SStreamState* pState; SWinKey delKey; + SHashObj* pGroupIdTbNameMap; // uint64_t -> char[TSDB_TABLE_NAME_LEN] } SStreamIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -771,6 +774,7 @@ typedef struct SStreamPartitionOperatorInfo { SOptrBasicInfo binfo; SPartitionBySupporter partitionSup; SExprSupp scalarSup; + SExprSupp tbnameCalSup; SHashObj* pPartitions; void* parIte; SSDataBlock* pInputDataBlock; @@ -1085,7 +1089,7 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol bool groupbyTbname(SNodeList* pGroupList); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); -int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, +int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index ee2aa5c3fa..9ce6275d84 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1124,45 +1124,45 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDa return pCol; } -void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) { +void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode)); pExp->pExpr->_function.num = 1; pExp->pExpr->_function.functionId = -1; - int32_t type = nodeType(pTargetNode->pExpr); + int32_t type = nodeType(pNode); // it is a project query, or group by column if (type == QUERY_NODE_COLUMN) { pExp->pExpr->nodeType = QUERY_NODE_COLUMN; - SColumnNode* pColNode = (SColumnNode*)pTargetNode->pExpr; + SColumnNode* pColNode = (SColumnNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); pExp->base.numOfParams = 1; SDataType* pType = &pColNode->node.resType; - pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, - pType->precision, pColNode->colName); + pExp->base.resSchema = + createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName); pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType); pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; } else if (type == QUERY_NODE_VALUE) { pExp->pExpr->nodeType = QUERY_NODE_VALUE; - SValueNode* pValNode = (SValueNode*)pTargetNode->pExpr; + SValueNode* pValNode = (SValueNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); pExp->base.numOfParams = 1; SDataType* pType = &pValNode->node.resType; - pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, - pType->precision, pValNode->node.aliasName); + pExp->base.resSchema = + createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName); pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE; nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param); } else if (type == QUERY_NODE_FUNCTION) { pExp->pExpr->nodeType = QUERY_NODE_FUNCTION; - SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; + SFunctionNode* pFuncNode = (SFunctionNode*)pNode; SDataType* pType = &pFuncNode->node.resType; - pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, - pType->precision, pFuncNode->node.aliasName); + pExp->base.resSchema = + createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName); pExp->pExpr->_function.functionId = pFuncNode->funcId; pExp->pExpr->_function.pFunctNode = pFuncNode; @@ -1204,20 +1204,24 @@ void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) { } } else if (type == QUERY_NODE_OPERATOR) { pExp->pExpr->nodeType = QUERY_NODE_OPERATOR; - SOperatorNode* pNode = (SOperatorNode*)pTargetNode->pExpr; + SOperatorNode* pOpNode = (SOperatorNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); pExp->base.numOfParams = 1; - SDataType* pType = &pNode->node.resType; - pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, - pType->precision, pNode->node.aliasName); - pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr; + SDataType* pType = &pOpNode->node.resType; + pExp->base.resSchema = + createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName); + pExp->pExpr->_optrRoot.pRootNode = pNode; } else { ASSERT(0); } } +void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) { + createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId); +} + SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) { int32_t numOfFuncs = LIST_LENGTH(pNodeList); int32_t numOfGroupKeys = 0; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a8c73f0170..e527f7e7c1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -49,17 +49,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SStreamScanInfo* pInfo = pOperator->info; -#if 0 - // TODO: if a block was set but not consumed, - // prevent setting a different type of block - pInfo->validBlockIndex = 0; - if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { - taosArrayClearP(pInfo->pBlockLists, taosMemoryFree); - } else { - taosArrayClear(pInfo->pBlockLists); - } -#endif - ASSERT(pInfo->validBlockIndex == 0); ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0); @@ -71,30 +60,13 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_SUBMIT) { - /*if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {*/ - /*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/ - /*return TSDB_CODE_QRY_APP_ERROR;*/ - /*}*/ ASSERT(numOfBlocks == 1); - /*if (numOfBlocks == 1) {*/ taosArrayPush(pInfo->pBlockLists, &input); pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; - /*} else {*/ - /*}*/ } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; taosArrayPush(pInfo->pBlockLists, &pDataBlock); - -#if 0 - // TODO optimize - SSDataBlock* p = createOneDataBlock(pDataBlock, false); - p->info = pDataBlock->info; - - taosArrayClear(p->pDataBlock); - taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); - taosArrayPush(pInfo->pBlockLists, &p); -#endif } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { @@ -107,27 +79,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); } -void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { -#if 0 - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) { - return; - } - SOperatorInfo* pOptrInfo = pTaskInfo->pRoot->pDownstream[0]; - - if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamScanInfo* pInfo = pOptrInfo->info; - if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { - taosArrayClearP(pInfo->pBlockLists, streamInputBlockDataDestory); - } else { - ASSERT(0); - } - } else { - ASSERT(0); - } -#endif -} - int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; @@ -330,8 +281,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } - bool exists = false; #if 0 + bool exists = false; for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) { STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k); if (pKeyInfo->uid == keyInfo.uid) { @@ -339,14 +290,15 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo exists = true; } } -#endif if (!exists) { - taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); - taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); - } +#endif + taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); + taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); } + /*}*/ + if (keyBuf != NULL) { taosMemoryFree(keyBuf); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5db55b02f8..44cd665052 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4214,8 +4214,9 @@ int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, return TSDB_CODE_SUCCESS; } -int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, +int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExprInfo* pExprInfo = pSup->pExprInfo; int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; @@ -4244,6 +4245,11 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SStreamState* pStat if (pBlock->info.groupId == 0) { pBlock->info.groupId = pPos->groupId; + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); + if (tbname != NULL) { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.groupId != pPos->groupId) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 7cb641a943..5e7afa1303 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -873,6 +873,26 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull); } pDest->info.rows++; + if (i == 0) { + SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); + SSDataBlock* pResBlock = createDataBlock(); + pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; + SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); + taosArrayPush(pResBlock->pDataBlock, &data); + blockDataEnsureCapacity(pResBlock, 1); + projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL); + ASSERT(pResBlock->info.rows == 1); + ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); + ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); + void* pData = colDataGetData(pCol, 0); + // TODO check tbname validation + if (pData != (void*)-1) { + memcpy(pDest->info.parTbName, varDataVal(pData), varDataLen(pData)); + } else { + pDest->info.parTbName[0] = 0; + } + } } blockDataUpdateTsWindow(pDest, pInfo->tsColIndex); pDest->info.groupId = pParInfo->groupId; @@ -895,6 +915,7 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat } else { SPartitionDataInfo newParData = {0}; newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen); + /*newParData.tbname = */ newParData.rowIds = taosArrayInit(64, sizeof(int32_t)); taosArrayPush(newParData.rowIds, &i); taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo)); @@ -1000,6 +1021,20 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr } } + if (pPartNode->pSubtable != NULL) { + SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo)); + if (pSubTableExpr == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + pInfo->tbnameCalSup.pExprInfo = pSubTableExpr; + createExprFromOneNode(pSubTableExpr, pPartNode->pSubtable, 0); + code = initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + int32_t keyLen = 0; code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupCols); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2251d09776..3752a9f211 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -924,29 +924,9 @@ _error: return NULL; } -static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { -#if 0 - if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { - size_t total = taosArrayGetSize(pInfo->pBlockLists); - for (int32_t i = 0; i < total; i++) { - SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i); - taosArrayDestroy(p->pDataBlock); - taosMemoryFree(p); - } - } -#endif +static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) { taosArrayClear(pInfo->pBlockLists); pInfo->validBlockIndex = 0; -#if 0 - size_t total = taosArrayGetSize(pInfo->pBlockLists); - - pInfo->validBlockIndex = 0; - for (int32_t i = 0; i < total; ++i) { - SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i); - blockDataDestroy(p); - } - taosArrayClear(pInfo->pBlockLists); -#endif } static bool isSessionWindow(SStreamScanInfo* pInfo) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index dceb696d54..ab72ad97ca 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1536,6 +1536,7 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp } } tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter); + /*taosHashRemove(pInfo->pGroupIdTbNameMap, &pWinKey->groupId, sizeof(int64_t));*/ } } return TSDB_CODE_SUCCESS; @@ -3045,7 +3046,7 @@ void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* // clear the existed group id pBlock->info.groupId = 0; - buildDataBlockFromGroupRes(pTaskInfo, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); + buildDataBlockFromGroupRes(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); } static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, @@ -3240,6 +3241,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv"); + if (pBlock->info.parTbName[0]) { + taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, + TSDB_TABLE_NAME_LEN); + } + ASSERT(pBlock->info.type != STREAM_INVERT); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; @@ -3477,6 +3483,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; + pInfo->pGroupIdTbNameMap = + taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; @@ -5659,6 +5668,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, "single interval recv"); + if (pBlock->info.parTbName[0]) { + taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, + TSDB_TABLE_NAME_LEN); + } + if (pBlock->info.type == STREAM_CLEAR) { doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, NULL); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); @@ -5806,6 +5820,9 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; + pInfo->pGroupIdTbNameMap = + taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + pOperator->name = "StreamIntervalOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; pOperator->blocking = true; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 4fb55ed373..86930268f1 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -783,7 +783,7 @@ static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTa pTbReq->name = strdup(tname); pTbReq->ctb.suid = suid; pTbReq->ctb.tagNum = tagNum; - if (sname) pTbReq->ctb.name = strdup(sname); + if (sname) pTbReq->ctb.stbName = strdup(sname); pTbReq->ctb.pTag = (uint8_t*)pTag; pTbReq->ctb.tagName = taosArrayDup(tagName); pTbReq->ttl = TSDB_DEFAULT_TABLE_TTL; @@ -2469,9 +2469,9 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols pTableMeta->tableInfo.numOfTags); taosArrayDestroy(tagName); - smlHandle->tableExecHandle.createTblReq.ctb.name = taosMemoryMalloc(sTableNameLen + 1); - memcpy(smlHandle->tableExecHandle.createTblReq.ctb.name, sTableName, sTableNameLen); - smlHandle->tableExecHandle.createTblReq.ctb.name[sTableNameLen] = 0; + smlHandle->tableExecHandle.createTblReq.ctb.stbName = taosMemoryMalloc(sTableNameLen + 1); + memcpy(smlHandle->tableExecHandle.createTblReq.ctb.stbName, sTableName, sTableNameLen); + smlHandle->tableExecHandle.createTblReq.ctb.stbName[sTableNameLen] = 0; STableDataBlocks* pDataBlock = NULL; ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 882fa8950c..ab337a985f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5381,9 +5381,9 @@ static EDealRes rewriteSubtable(SNode** pNode, void* pContext) { found = true; break; } - if (!found) { - return generateDealNodeErrMsg(pCxt->pCxt, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnNode*)*pNode)->colName); - } + } + if (!found) { + return generateDealNodeErrMsg(pCxt->pCxt, TSDB_CODE_PAR_INVALID_COLUMN, ((SColumnNode*)*pNode)->colName); } return DEAL_RES_IGNORE_CHILD; } @@ -6454,7 +6454,7 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S } req.ctb.suid = suid; req.ctb.tagNum = tagNum; - req.ctb.name = strdup(sTableNmae); + req.ctb.stbName = strdup(sTableNmae); req.ctb.pTag = (uint8_t*)pTag; req.ctb.tagName = taosArrayDup(tagName); if (pStmt->ignoreExists) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index e6705a77b2..2fea5b9eca 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -35,6 +35,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); pDataBlock->info.version = be64toh(pRetrieve->version); pDataBlock->info.watermark = be64toh(pRetrieve->watermark); + memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN); pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.childId = pReq->upstreamChildId; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e6960ae350..9d8a44c1ef 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -195,6 +195,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->version = htobe64(pBlock->info.version); pRetrieve->watermark = htobe64(pBlock->info.watermark); + memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); pRetrieve->numOfCols = htonl(numOfCols); @@ -250,7 +251,13 @@ FAIL: int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, int64_t groupId) { - char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId); + char* ctbName; + if (pDataBlock->info.parTbName[0]) { + ctbName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); + } else { + ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId); + } SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index da0d0fbd6d..8c851127dd 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -60,6 +60,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { if (!specPath) { sprintf(statePath, "%s/%d", path, pTask->taskId); } else { + memset(statePath, 0, 300); strncpy(statePath, path, 300); } if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) {