diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 9c1f2063a7..77f1879b81 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -174,15 +174,28 @@ typedef struct SColumnDataAgg { } SColumnDataAgg; #pragma pack(pop) +typedef struct SBlockID { + // The uid of table, from which current data block comes. And it is always 0, if current block is the + // result of calculation. + uint64_t uid; + + // Block id, acquired and assigned from executor, which created according to the hysical planner. Block id is used + // to mark the stage of exec task. + uint64_t blockId; + + // Generated by group/partition by [value|tags]. Created and assigned by table-scan operator, group-by operator, + // and partition by operator. + uint64_t groupId; +} SBlockID; + typedef struct SDataBlockInfo { STimeWindow window; - int32_t rows; // todo hide this attribute int32_t rowSize; - uint64_t uid; // the uid of table, from which current data block comes - uint16_t blockId; // block id, generated by physical planner - uint64_t groupId; - int16_t hasVarCol; + int32_t rows; // todo hide this attribute uint32_t capacity; + SBlockID id; + int16_t hasVarCol; + // TODO: optimize and remove following int64_t version; // used for stream, and need serialization int32_t childId; // used for stream, do not serialize @@ -190,8 +203,8 @@ typedef struct SDataBlockInfo { STimeWindow calWin; // used for stream, do not serialize TSKEY watermark; // used for stream - char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition - STag* pTag; // used for stream partition + char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition + STag* pTag; // used for stream partition } SDataBlockInfo; typedef struct SSDataBlock { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index cfa2964e16..a0f795a729 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -621,7 +621,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { // todo remove this int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) { pBlock->info.rows = *(int32_t*)buf; - pBlock->info.groupId = *(uint64_t*)(buf + sizeof(int32_t)); + pBlock->info.id.groupId = *(uint64_t*)(buf + sizeof(int32_t)); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -1140,7 +1140,8 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { SDataBlockInfo* pInfo = &pDataBlock->info; pInfo->rows = 0; - pInfo->groupId = 0; + pInfo->id.uid = 0; + pInfo->id.groupId = 0; pInfo->window.ekey = 0; pInfo->window.skey = 0; @@ -1334,7 +1335,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) { SSDataBlock* createSpecialDataBlock(EStreamType type) { SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); pBlock->info.hasVarCol = false; - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; pBlock->info.rows = 0; pBlock->info.type = type; pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + @@ -1675,7 +1676,7 @@ int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) { } int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { - int64_t tbUid = pBlock->info.uid; + int64_t tbUid = pBlock->info.id.uid; int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int16_t hasVarCol = pBlock->info.hasVarCol; int32_t rows = pBlock->info.rows; @@ -1713,7 +1714,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - buf = taosDecodeFixedU64(buf, &pBlock->info.uid); + buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid); buf = taosDecodeFixedI16(buf, &numOfCols); buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol); buf = taosDecodeFixedI32(buf, &pBlock->info.rows); @@ -1834,7 +1835,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { int32_t rows = pDataBlock->info.rows; printf("%s |block ver %" PRIi64 " |block type %d |child id %d|group id %" PRIu64 "\n", flag, pDataBlock->info.version, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, - pDataBlock->info.groupId); + pDataBlock->info.id.groupId); for (int32_t j = 0; j < rows; j++) { printf("%s |", flag); for (int32_t k = 0; k < numOfCols; k++) { @@ -1905,8 +1906,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%d|version:%" PRIu64 "\n", - flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId, - pDataBlock->info.uid, pDataBlock->info.rows, pDataBlock->info.version); + flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, + pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version); if (len >= size - 1) return dumpBuf; for (int32_t j = 0; j < rows; j++) { @@ -2035,8 +2036,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB for (int32_t i = 0; i < sz; ++i) { int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; - // int32_t rowSize = pDataBlock->info.rowSize; - // int64_t groupId = pDataBlock->info.groupId; if (colNum <= 1) { // invalid if only with TS col @@ -2049,7 +2048,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen); pSubmitBlk->suid = suid; - pSubmitBlk->uid = pDataBlock->info.groupId; + pSubmitBlk->uid = pDataBlock->info.id.groupId; pSubmitBlk->numOfRows = rows; pSubmitBlk->sversion = pTSchema->version; @@ -2292,7 +2291,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { } *actualLen = dataLen; - *groupId = pBlock->info.groupId; + *groupId = pBlock->info.id.groupId; ASSERT(dataLen > 0); uDebug("build data block, actualLen:%d, rows:%d, cols:%d", dataLen, *rows, *cols); @@ -2325,7 +2324,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += sizeof(int32_t); // group id sizeof(uint64_t) - pBlock->info.groupId = *(uint64_t*)pStart; + pBlock->info.id.groupId = *(uint64_t*)pStart; pStart += sizeof(uint64_t); if (pBlock->pDataBlock == NULL) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 51842a8ae4..6bd2ae3435 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -708,7 +708,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma #endif for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { SSDataBlock *output = taosArrayGetP(pResList, i); - smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.uid, output->info.groupId, + smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.id.uid, output->info.id.groupId, output->info.rows); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); @@ -718,7 +718,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%" PRIu64 ", level %" PRIi8 " failed since %s", - SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr()); + SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr()); goto _err; } @@ -726,13 +726,13 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma taosMemoryFreeClear(pReq); smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8 " failed since %s", - SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr()); + SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr()); goto _err; } smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64 " len %" PRIu32, - SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version, + SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version, htonl(pReq->header.contLen)); taosMemoryFreeClear(pReq); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index e41b1d8aa8..c3a4cefc66 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -530,7 +530,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter); - pBlock->info.uid = pReader->msgIter.uid; + pBlock->info.id.uid = pReader->msgIter.uid; pBlock->info.rows = pReader->msgIter.numOfRows; pBlock->info.version = pReader->pMsg->version; @@ -649,7 +649,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas } SSDataBlock* pBlock = taosArrayGetLast(blocks); - pBlock->info.uid = pReader->msgIter.uid; + pBlock->info.id.uid = pReader->msgIter.uid; pBlock->info.rows = 0; pBlock->info.version = pReader->pMsg->version; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 30cde5d475..a4bfb6c876 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -103,7 +103,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem // STagVal tagVal = { // .cid = pTagSchemaWrapper->pSchema[j].colId, // .type = pTagSchemaWrapper->pSchema[j].type, - // .i64 = (int64_t)pDataBlock->info.groupId, + // .i64 = (int64_t)pDataBlock->info.id.groupId, // }; // taosArrayPush(tagArray, &tagVal); // taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name); @@ -134,7 +134,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem STagVal tagVal = { .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.groupId, + .i64 = (int64_t)pDataBlock->info.id.groupId, }; taosArrayPush(tagArray, &tagVal); createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); @@ -161,7 +161,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem if (pDataBlock->info.parTbName[0]) { createTbReq.name = strdup(pDataBlock->info.parTbName); } else { - createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); + createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); } // save schema len @@ -358,7 +358,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d if (pDataBlock->info.parTbName[0]) { ctbName = strdup(pDataBlock->info.parTbName); } else { - ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); + ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); } int32_t schemaLen = 0; @@ -390,7 +390,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d STagVal tagVal = { .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.groupId, + .i64 = (int64_t)pDataBlock->info.id.groupId, }; taosArrayPush(tagArray, &tagVal); createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b97f82f3d6..f2515c6d48 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1623,7 +1623,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader); blockDataUpdateTsWindow(pBlock, 0); - pBlock->info.uid = pBlockScanInfo->uid; + pBlock->info.id.uid = pBlockScanInfo->uid; setComposedBlockFlag(pReader, true); @@ -2493,7 +2493,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } _end: - pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; + pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; blockDataUpdateTsWindow(pResBlock, 0); setComposedBlockFlag(pReader, true); @@ -2505,7 +2505,7 @@ _end: if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", - pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, + pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } @@ -2829,7 +2829,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } else { // whole block is required, return it directly SDataBlockInfo* pInfo = &pReader->pResBlock->info; pInfo->rows = pBlock->nRow; - pInfo->uid = pScanInfo->uid; + pInfo->id.uid = pScanInfo->uid; pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts}; setComposedBlockFlag(pReader, false); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order); @@ -4018,7 +4018,7 @@ bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) { ASSERT(pReader != NULL); *rows = pReader->pResBlock->info.rows; - *uid = pReader->pResBlock->info.uid; + *uid = pReader->pResBlock->info.id.uid; *pWindow = pReader->pResBlock->info.window; } diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 0278a11f80..be101059f2 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -436,7 +436,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { pDataBlock->pBlockAgg = NULL; taosArrayGetSize(pDataBlock->pDataBlock) = tSmaNumOfCols; pDataBlock->info.rows = tSmaNumOfRows; - pDataBlock->info.groupId = tSmaGroupId + g; + pDataBlock->info.id.groupId = tSmaGroupId + g; pDataBlock->pDataBlock = taosArrayInit(tSmaNumOfCols, sizeof(SColumnInfoData *)); EXPECT_NE(pDataBlock->pDataBlock, nullptr); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index cdd744bded..c432f3c01c 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -185,7 +185,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } } - pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); + pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); pRes->info.rows = 1; SExprSupp* pSup = &pInfo->pseudoExprSup; @@ -196,7 +196,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { return NULL; } - pRes->info.groupId = getTableGroupId(pTableList, pRes->info.uid); + pRes->info.id.groupId = getTableGroupId(pTableList, pRes->info.id.uid); pInfo->indexOfBufferedRes += 1; return pRes; } else { @@ -232,12 +232,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SExprSupp* pSup = &pInfo->pseudoExprSup; STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0]; - pInfo->pRes->info.groupId = pKeyInfo->groupId; + pInfo->pRes->info.id.groupId = pKeyInfo->groupId; if (taosArrayGetSize(pInfo->pUidList) > 0) { ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); - pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); + pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 280880c077..b2ddff45a4 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -717,10 +717,10 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) { if (pLimitInfo->remainGroupOffset > 0) { if (pLimitInfo->currentGroupId == 0) { // it is the first group - pLimitInfo->currentGroupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.id.groupId; blockDataCleanup(pBlock); return PROJECT_RETRIEVE_CONTINUE; - } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) { + } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) { // now it is the data from a new group pLimitInfo->remainGroupOffset -= 1; @@ -732,11 +732,11 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa } // set current group id of the project operator - pLimitInfo->currentGroupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.id.groupId; } // here check for a new group data, we need to handle the data of the previous group. - if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) { + if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) { pLimitInfo->numOfOutputGroups += 1; if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { pOperator->status = OP_EXEC_DONE; @@ -758,7 +758,7 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa // here we reach the start position, according to the limit/offset requirements. // set current group id - pLimitInfo->currentGroupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.id.groupId; if (pLimitInfo->remainOffset >= pBlock->info.rows) { pLimitInfo->remainOffset -= pBlock->info.rows; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 08e6e4792b..b135566caa 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -213,7 +213,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { SSDataBlock* pBlock = createDataBlock(); - pBlock->info.blockId = pNode->dataBlockId; + pBlock->info.id.blockId = pNode->dataBlockId; pBlock->info.type = STREAM_INVALID; pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; pBlock->info.watermark = INT64_MIN; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5abde1be85..514236159d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -447,7 +447,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int pCtx[i].scanFlag = scanFlag; SInputColumnInfoData* pInput = &pCtx[i].input; - pInput->uid = pBlock->info.uid; + pInput->uid = pBlock->info.id.uid; pInput->colDataSMAIsSet = false; SExprInfo* pOneExpr = &pExprSup->pExprInfo[i]; @@ -506,184 +506,6 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { - size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0; - for (int32_t i = 0; i < num; ++i) { - pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i); - } -} - -int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, - int32_t numOfOutput, SArray* pPseudoList) { - setPseudoOutputColInfo(pResult, pCtx, pPseudoList); - - if (pSrcBlock == NULL) { - for (int32_t k = 0; k < numOfOutput; ++k) { - int32_t outputSlotId = pExpr[k].base.resSchema.slotId; - - ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE); - SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - - int32_t type = pExpr[k].base.pParam[0].param.nType; - if (TSDB_DATA_TYPE_NULL == type) { - colDataAppendNNULL(pColInfoData, 0, 1); - } else { - colDataAppend(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false); - } - } - - pResult->info.rows = 1; - return TSDB_CODE_SUCCESS; - } - - if (pResult != pSrcBlock) { - pResult->info.groupId = pSrcBlock->info.groupId; - memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); - } - - // if the source equals to the destination, it is to create a new column as the result of scalar - // function or some operators. - bool createNewColModel = (pResult == pSrcBlock); - if (createNewColModel) { - blockDataEnsureCapacity(pResult, pResult->info.rows); - } - - int32_t numOfRows = 0; - - for (int32_t k = 0; k < numOfOutput; ++k) { - int32_t outputSlotId = pExpr[k].base.resSchema.slotId; - SqlFunctionCtx* pfCtx = &pCtx[k]; - SInputColumnInfoData* pInputData = &pfCtx->input; - - if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query - SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - if (pResult->info.rows > 0 && !createNewColModel) { - colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0], - pInputData->numOfRows); - } else { - colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info); - } - - numOfRows = pInputData->numOfRows; - } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { - SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - - int32_t offset = createNewColModel ? 0 : pResult->info.rows; - - int32_t type = pExpr[k].base.pParam[0].param.nType; - if (TSDB_DATA_TYPE_NULL == type) { - colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows); - } else { - for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { - colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false); - } - } - - numOfRows = pSrcBlock->info.rows; - } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) { - SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); - taosArrayPush(pBlockList, &pSrcBlock); - - SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); - SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; - - SScalarParam dest = {.columnData = &idata}; - int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pBlockList); - return code; - } - - int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; - ASSERT(pResult->info.capacity > 0); - - colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows); - colDataDestroy(&idata); - - numOfRows = dest.numOfRows; - taosArrayDestroy(pBlockList); - } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { - // _rowts/_c0, not tbname column - if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) { - // do nothing - } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) { - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx); - pfCtx->fpSet.init(pfCtx, pResInfo); - - pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); - pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset - - // set the timestamp(_rowts) output buffer - if (taosArrayGetSize(pPseudoList) > 0) { - int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); - pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; - } - - // link pDstBlock to set selectivity value - if (pfCtx->subsidiaries.num > 0) { - pfCtx->pDstBlock = pResult; - } - - int32_t code = pfCtx->fpSet.process(pfCtx); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - numOfRows = pResInfo->numOfRes; - } else if (fmIsAggFunc(pfCtx->functionId)) { - // selective value output should be set during corresponding function execution - if (fmIsSelectValueFunc(pfCtx->functionId)) { - continue; - } - // _group_key function for "partition by tbname" + csum(col_name) query - SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); - int32_t slotId = pfCtx->param[0].pCol->slotId; - - // todo handle the json tag - SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId); - for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) { - bool isNull = colDataIsNull_s(pInput, f); - if (isNull) { - colDataAppendNULL(pOutput, pResult->info.rows + f); - } else { - char* data = colDataGetData(pInput, f); - colDataAppend(pOutput, pResult->info.rows + f, data, isNull); - } - } - - } else { - SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); - taosArrayPush(pBlockList, &pSrcBlock); - - SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); - SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; - - SScalarParam dest = {.columnData = &idata}; - int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pBlockList); - return code; - } - - int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; - ASSERT(pResult->info.capacity > 0); - colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows); - colDataDestroy(&idata); - - numOfRows = dest.numOfRows; - taosArrayDestroy(pBlockList); - } - } else { - return TSDB_CODE_OPS_NOT_SUPPORT; - } - } - - if (!createNewColModel) { - pResult->info.rows += numOfRows; - } - - return TSDB_CODE_SUCCESS; -} - bool functionNeedToExecute(SqlFunctionCtx* pCtx) { struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -862,7 +684,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowEntryInfoOffset); } else { - if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId, + if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -913,7 +735,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); - if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId, + if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1245,11 +1067,11 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS continue; } - if (pBlock->info.groupId == 0) { - pBlock->info.groupId = pPos->groupId; + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = pPos->groupId; } else { // current value belongs to different group, it can't be packed into one datablock - if (pBlock->info.groupId != pPos->groupId) { + if (pBlock->info.id.groupId != pPos->groupId) { releaseBufPage(pBuf, page); break; } @@ -1269,7 +1091,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS } qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, - pBlock->info.groupId); + pBlock->info.id.groupId); blockDataUpdateTsWindow(pBlock, 0); return 0; @@ -1289,12 +1111,12 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr } // clear the existed group id - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; ASSERT(!pbInfo->mergeResultBlock); doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); void* tbname = NULL; - if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { + if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -1316,7 +1138,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG } // clear the existed group id - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; if (!pbInfo->mergeResultBlock) { doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); } else { @@ -1327,11 +1149,11 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG } // clearing group id to continue to merge data that belong to different groups - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; } // clear the group id info in SSDataBlock, since the client does not need it - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; } } @@ -1635,7 +1457,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId); + setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); setInputDataBlock(pSup, pBlock, order, scanFlag, true); code = doAggregateImpl(pOperator, pSup->pCtx); if (code != 0) { @@ -1712,7 +1534,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows; taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows); - pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId; + pInfo->curGroupId = pInfo->existNewGroupBlock->info.id.groupId; pInfo->existNewGroupBlock = NULL; } @@ -1721,7 +1543,7 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera if (taosFillHasMoreResults(pInfo->pFillInfo)) { int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows; taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows); - pInfo->pRes->info.groupId = pInfo->curGroupId; + pInfo->pRes->info.id.groupId = pInfo->curGroupId; return; } @@ -1743,7 +1565,7 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlo setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false); projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs, NULL); - pInfo->pRes->info.groupId = pBlock->info.groupId; + pInfo->pRes->info.id.groupId = pBlock->info.id.groupId; } static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { @@ -1761,7 +1583,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows > 0) { - pResBlock->info.groupId = pInfo->curGroupId; + pResBlock->info.id.groupId = pInfo->curGroupId; return pResBlock; } @@ -1783,8 +1605,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows); doApplyScalarCalculation(pOperator, pBlock, order, scanFlag); - if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) { - pInfo->curGroupId = pInfo->pRes->info.groupId; // the first data block + if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.id.groupId) { + pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block pInfo->totalInputRows += pInfo->pRes->info.rows; if (order == pInfo->pFillInfo->order) { @@ -1793,7 +1615,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.skey); } taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes); - } else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block + } else if (pInfo->curGroupId != pBlock->info.id.groupId) { // the new group data block pInfo->existNewGroupBlock = pBlock; // Fill the previous group data block, before handle the data block of new group. @@ -1810,13 +1632,13 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { // 1. The result in current group not reach the threshold of output result, continue // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) { - pResBlock->info.groupId = pInfo->curGroupId; + pResBlock->info.id.groupId = pInfo->curGroupId; return pResBlock; } doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) { - pResBlock->info.groupId = pInfo->curGroupId; + pResBlock->info.id.groupId = pInfo->curGroupId; return pResBlock; } } else if (pInfo->existNewGroupBlock) { // try next group @@ -1826,7 +1648,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows > pResultInfo->threshold) { - pResBlock->info.groupId = pInfo->curGroupId; + pResBlock->info.id.groupId = pInfo->curGroupId; return pResBlock; } } else { @@ -2967,10 +2789,10 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat continue; } - if (pBlock->info.groupId == 0) { - pBlock->info.groupId = pPos->groupId; + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = pPos->groupId; void* tbname = NULL; - if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { + if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -2978,7 +2800,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat tdbFree(tbname); } else { // current value belongs to different group, it can't be packed into one datablock - if (pBlock->info.groupId != pPos->groupId) { + if (pBlock->info.id.groupId != pPos->groupId) { releaseOutputBuf(pState, &key, pRow); break; } @@ -3058,11 +2880,11 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta continue; } - if (pBlock->info.groupId == 0) { - pBlock->info.groupId = pKey->groupId; + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = pKey->groupId; void* tbname = NULL; - if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { + if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -3070,7 +2892,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta tdbFree(tbname); } else { // current value belongs to different group, it can't be packed into one datablock - if (pBlock->info.groupId != pKey->groupId) { + if (pBlock->info.id.groupId != pKey->groupId) { releaseOutputBuf(pState, NULL, pRow); break; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 6dc8818900..4601175561 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -308,7 +308,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, - len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); + len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } @@ -325,7 +325,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { if (num > 0) { len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, - len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); + len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } @@ -697,7 +697,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { releaseBufPage(pInfo->pBuf, page); blockDataUpdateTsWindow(pInfo->binfo.pRes, 0); - pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId; + pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId; pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows; return pInfo->binfo.pRes; @@ -952,7 +952,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { taosArrayDestroy(pParInfo->rowIds); pParInfo->rowIds = NULL; blockDataUpdateTsWindow(pDest, pInfo->tsColIndex); - pDest->info.groupId = pParInfo->groupId; + pDest->info.id.groupId = pParInfo->groupId; pOperator->resultInfo.totalRows += pDest->info.rows; pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte); ASSERT(pDest->info.rows > 0); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 3839af9913..e7cce39dfd 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -185,7 +185,7 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* int32_t rowIndex = -1; SColumnInfoData* pSrc = NULL; - if (pLeftBlock->info.blockId == blockId) { + if (pLeftBlock->info.id.blockId == blockId) { pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId); rowIndex = leftPos; } else { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 4bba3a72e1..da77facb21 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -138,13 +138,13 @@ _error: static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo) { if (pLimitInfo->remainGroupOffset > 0) { // it is the first group - if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.groupId) { - pLimitInfo->currentGroupId = pBlock->info.groupId; + if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.id.groupId) { + pLimitInfo->currentGroupId = pBlock->info.id.groupId; return PROJECT_RETRIEVE_CONTINUE; - } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) { + } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) { // now it is the data from a new group pLimitInfo->remainGroupOffset -= 1; - pLimitInfo->currentGroupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.id.groupId; // ignore data block in current group if (pLimitInfo->remainGroupOffset > 0) { @@ -153,7 +153,7 @@ static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo } // set current group id of the project operator - pLimitInfo->currentGroupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.id.groupId; } return PROJECT_RETRIEVE_DONE; @@ -164,7 +164,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S // here check for a new group data, we need to handle the data of the previous group. ASSERT(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1); - if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) { + if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) { pLimitInfo->numOfOutputGroups += 1; if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { setOperatorCompleted(pOperator); @@ -306,7 +306,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - status = doIngroupLimitOffset(pLimitInfo, pBlock->info.groupId, pInfo->pRes, pOperator); + status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator); if (status == PROJECT_RETRIEVE_CONTINUE) { continue; } @@ -316,7 +316,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { if (pProjectInfo->mergeDataBlocks) { if (pRes->info.rows > 0) { - pFinalRes->info.groupId = pRes->info.groupId; + pFinalRes->info.id.groupId = pRes->info.id.groupId; pFinalRes->info.version = pRes->info.version; // continue merge data, ignore the group id @@ -511,11 +511,11 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { break; } - if (pIndefInfo->groupId == 0 && pBlock->info.groupId != 0) { - pIndefInfo->groupId = pBlock->info.groupId; // this is the initial group result + if (pIndefInfo->groupId == 0 && pBlock->info.id.groupId != 0) { + pIndefInfo->groupId = pBlock->info.id.groupId; // this is the initial group result } else { - if (pIndefInfo->groupId != pBlock->info.groupId) { // reset output buffer and computing status - pIndefInfo->groupId = pBlock->info.groupId; + if (pIndefInfo->groupId != pBlock->info.id.groupId) { // reset output buffer and computing status + pIndefInfo->groupId = pBlock->info.id.groupId; pIndefInfo->pNextGroupRes = pBlock; break; } @@ -643,3 +643,182 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { return (pRes->info.rows > 0) ? pRes : NULL; } + +static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { + size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0; + for (int32_t i = 0; i < num; ++i) { + pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i); + } +} + +int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, + int32_t numOfOutput, SArray* pPseudoList) { + setPseudoOutputColInfo(pResult, pCtx, pPseudoList); + + if (pSrcBlock == NULL) { + for (int32_t k = 0; k < numOfOutput; ++k) { + int32_t outputSlotId = pExpr[k].base.resSchema.slotId; + + ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE); + SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); + + int32_t type = pExpr[k].base.pParam[0].param.nType; + if (TSDB_DATA_TYPE_NULL == type) { + colDataAppendNNULL(pColInfoData, 0, 1); + } else { + colDataAppend(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false); + } + } + + pResult->info.rows = 1; + return TSDB_CODE_SUCCESS; + } + + if (pResult != pSrcBlock) { + pResult->info.id.groupId = pSrcBlock->info.id.groupId; + memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); + } + + // if the source equals to the destination, it is to create a new column as the result of scalar + // function or some operators. + bool createNewColModel = (pResult == pSrcBlock); + if (createNewColModel) { + blockDataEnsureCapacity(pResult, pResult->info.rows); + } + + int32_t numOfRows = 0; + + for (int32_t k = 0; k < numOfOutput; ++k) { + int32_t outputSlotId = pExpr[k].base.resSchema.slotId; + SqlFunctionCtx* pfCtx = &pCtx[k]; + SInputColumnInfoData* pInputData = &pfCtx->input; + + if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query + SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); + if (pResult->info.rows > 0 && !createNewColModel) { + colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0], + pInputData->numOfRows); + } else { + colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info); + } + + numOfRows = pInputData->numOfRows; + } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { + SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); + + int32_t offset = createNewColModel ? 0 : pResult->info.rows; + + int32_t type = pExpr[k].base.pParam[0].param.nType; + if (TSDB_DATA_TYPE_NULL == type) { + colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows); + } else { + char* p = taosVariantGet(&pExpr[k].base.pParam[0].param, type); + for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { + colDataAppend(pColInfoData, i + offset, p, false); + } + } + + numOfRows = pSrcBlock->info.rows; + } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) { + SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); + taosArrayPush(pBlockList, &pSrcBlock); + + SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; + + SScalarParam dest = {.columnData = &idata}; + int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pBlockList); + return code; + } + + int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; + ASSERT(pResult->info.capacity > 0); + + colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows); + colDataDestroy(&idata); + + numOfRows = dest.numOfRows; + taosArrayDestroy(pBlockList); + } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { + // _rowts/_c0, not tbname column + if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) { + // do nothing + } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx); + pfCtx->fpSet.init(pfCtx, pResInfo); + + pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); + pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset + + // set the timestamp(_rowts) output buffer + if (taosArrayGetSize(pPseudoList) > 0) { + int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); + pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; + } + + // link pDstBlock to set selectivity value + if (pfCtx->subsidiaries.num > 0) { + pfCtx->pDstBlock = pResult; + } + + int32_t code = pfCtx->fpSet.process(pfCtx); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + numOfRows = pResInfo->numOfRes; + } else if (fmIsAggFunc(pfCtx->functionId)) { + // selective value output should be set during corresponding function execution + if (fmIsSelectValueFunc(pfCtx->functionId)) { + continue; + } + // _group_key function for "partition by tbname" + csum(col_name) query + SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); + int32_t slotId = pfCtx->param[0].pCol->slotId; + + // todo handle the json tag + SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId); + for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) { + bool isNull = colDataIsNull_s(pInput, f); + if (isNull) { + colDataAppendNULL(pOutput, pResult->info.rows + f); + } else { + char* data = colDataGetData(pInput, f); + colDataAppend(pOutput, pResult->info.rows + f, data, isNull); + } + } + + } else { + SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); + taosArrayPush(pBlockList, &pSrcBlock); + + SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; + + SScalarParam dest = {.columnData = &idata}; + int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pBlockList); + return code; + } + + int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; + ASSERT(pResult->info.capacity > 0); + colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows); + colDataDestroy(&idata); + + numOfRows = dest.numOfRows; + taosArrayDestroy(pBlockList); + } + } else { + return TSDB_CODE_OPS_NOT_SUPPORT; + } + } + + if (!createNewColModel) { + pResult->info.rows += numOfRows; + } + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c0bea731bd..84b7678b9f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -184,7 +184,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup; SFilePage* pPage = NULL; - SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage); + SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage); if (pRow == NULL) { return TSDB_CODE_SUCCESS; @@ -484,13 +484,13 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int // 1. check if it is existed in meta cache if (pCache == NULL) { metaReaderInit(&mr, pHandle->meta, 0); - code = metaGetTableEntryByUidCache(&mr, pBlock->info.uid); + code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { - qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, + qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno), idStr); } else { - qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno), idStr); } metaReaderClear(&mr); return terrno; @@ -505,16 +505,16 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int } else { pCache->metaFetch += 1; - h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(pBlock->info.uid)); + h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); if (h == NULL) { metaReaderInit(&mr, pHandle->meta, 0); - code = metaGetTableEntryByUidCache(&mr, pBlock->info.uid); + code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", - pBlock->info.uid, tstrerror(terrno), idStr); + pBlock->info.id.uid, tstrerror(terrno), idStr); } else { - qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno), idStr); } metaReaderClear(&mr); @@ -528,7 +528,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int val = *pVal; freeReader = true; - int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(uint64_t), pVal, + int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal, sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW); if (ret != TAOS_LRU_STATUS_OK) { qError("failed to put meta into lru cache, code:%d, %s", ret, idStr); @@ -642,13 +642,13 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { SDataBlockInfo* pBInfo = &pBlock->info; int32_t rows = 0; - tsdbRetrieveDataBlockInfo(pTableScanInfo->base.dataReader, &rows, &pBInfo->uid, &pBInfo->window); + tsdbRetrieveDataBlockInfo(pTableScanInfo->base.dataReader, &rows, &pBInfo->id.uid, &pBInfo->window); blockDataEnsureCapacity(pBlock, rows); // todo remove it latter pBInfo->rows = rows; - ASSERT(pBInfo->uid != 0); - pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); + ASSERT(pBInfo->id.uid != 0); + pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); uint32_t status = 0; int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); @@ -668,13 +668,13 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime; // todo refactor - /*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/ + /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/ /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/ pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; - pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid; + pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid; pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; - ASSERT(pBlock->info.uid != 0); + ASSERT(pBlock->info.id.uid != 0); return pBlock; } return NULL; @@ -786,7 +786,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { - ASSERT(result->info.uid != 0); + ASSERT(result->info.id.uid != 0); return result; } @@ -1009,7 +1009,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU SDataBlockInfo* pBInfo = &pBlock->info; int32_t rows = 0; - tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->uid, &pBInfo->window); + tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->id.uid, &pBInfo->window); SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL); blockDataEnsureCapacity(pBlock, rows); @@ -1018,7 +1018,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU relocateColumnData(pBlock, pTableScanInfo->base.matchInfo.pList, pCols, true); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, rows); - pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->uid); + pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->id.uid); } tsdbReaderClose(pReader); @@ -1175,7 +1175,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 pResult->info.calWin = pInfo->updateWin; return pResult; } - } else if (pResult->info.groupId == pInfo->groupId) { + } else if (pResult->info.id.groupId == pInfo->groupId) { pResult->info.calWin = pInfo->updateWin; return pResult; } @@ -1366,7 +1366,7 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { if (pBlock == NULL || pBlock->info.rows == 0) return; void* tbname = NULL; - if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { + if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -1399,8 +1399,8 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { pBlock->info.parTbName[0] = 0; } - if (pBlock->info.groupId && pBlock->info.parTbName[0]) { - streamStatePutParName(pState, pBlock->info.groupId, pBlock->info.parTbName); + if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { + streamStatePutParName(pState, pBlock->info.id.groupId, pBlock->info.parTbName); } blockDataDestroy(pSrcBlock); @@ -1434,7 +1434,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; - bool tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid); + bool tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid); for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; @@ -1445,18 +1445,18 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock isClosed = isCloseWindow(&win, &pInfo->twAggSup); } // must check update info first. - bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); + bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]); bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && - isDeletedStreamWindow(&win, pBlock->info.groupId, + isDeletedStreamWindow(&win, pBlock->info.id.groupId, pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup); if ((update || closedWin) && out) { qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); uint64_t gpId = 0; - appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId, + appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId, NULL); if (closedWin && pInfo->partitionSup.needCalc) { gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId); - appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId, + appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId, NULL); } } @@ -1476,11 +1476,11 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); pInfo->pRes->info.rows = pBlock->info.rows; - pInfo->pRes->info.uid = pBlock->info.uid; + pInfo->pRes->info.id.uid = pBlock->info.id.uid; pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.version = pBlock->info.version; - pInfo->pRes->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); + pInfo->pRes->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { @@ -1807,8 +1807,8 @@ FETCH_NEXT_BLOCK: int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); - if (pBlock->info.groupId && pBlock->info.parTbName[0]) { - streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, pBlock->info.parTbName); + if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { + streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName); } // TODO move into scan pBlock->info.calWin.skey = INT64_MIN; @@ -1960,7 +1960,7 @@ FETCH_NEXT_BLOCK: setBlockIntoRes(pInfo, &block, false); - if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, + if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId, pInfo->pRes->info.version)) { printDataBlock(pInfo->pRes, "stream scan ignore"); blockDataCleanup(pInfo->pRes); @@ -2050,7 +2050,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { } int32_t rows = 0; - tsdbRetrieveDataBlockInfo(pInfo->dataReader, &rows, &pBlock->info.uid, &pBlock->info.window); + tsdbRetrieveDataBlockInfo(pInfo->dataReader, &rows, &pBlock->info.id.uid, &pBlock->info.window); pBlock->info.rows = rows; SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); @@ -2059,9 +2059,9 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, terrno); } - qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.uid); + qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid); pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; - pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid; + pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid; pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; return pBlock; } @@ -2555,7 +2555,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { blockDataCleanup(pBlock); int32_t rows = 0; - tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window); + tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.id.uid, &pBlock->info.window); blockDataEnsureCapacity(pBlock, rows); pBlock->info.rows = rows; @@ -2577,7 +2577,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { continue; } - pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); + pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -2773,7 +2773,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); if (pBlock != NULL) { - pBlock->info.groupId = pInfo->groupId; + pBlock->info.id.groupId = pInfo->groupId; pOperator->resultInfo.totalRows += pBlock->info.rows; return pBlock; } else { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index f2c8dc5083..ec754f31b0 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -359,7 +359,7 @@ SSDataBlock* fetchNextGroupSortDataBlock(void* param) { SOperatorInfo* childOp = source->childOpInfo; SSDataBlock* block = childOp->fpSet.getNextFn(childOp); if (block != NULL) { - if (block->info.groupId == grpSortOpInfo->currGroupId) { + if (block->info.id.groupId == grpSortOpInfo->currGroupId) { grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; return block; } else { @@ -439,7 +439,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); return NULL; } - pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; + pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId; pInfo->childOpStatus = CHILD_OP_NEW_GROUP; beginSortGroup(pOperator); } @@ -451,13 +451,13 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pInfo); if (pBlock != NULL) { - pBlock->info.groupId = pInfo->currGroupId; + pBlock->info.id.groupId = pInfo->currGroupId; pOperator->resultInfo.totalRows += pBlock->info.rows; return pBlock; } else { if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) { finishSortGroup(pOperator); - pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; + pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId; beginSortGroup(pOperator); } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) { finishSortGroup(pOperator); @@ -691,10 +691,10 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData pInfo->limitInfo.numOfOutputRows += p->info.rows; pDataBlock->info.rows = p->info.rows; - pDataBlock->info.groupId = pInfo->groupId; + pDataBlock->info.id.groupId = pInfo->groupId; } - qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.groupId, + qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId, pDataBlock->info.rows); return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 7ef2668804..3d35326749 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1831,39 +1831,39 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con } static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } - SBlockDistInfo* pBlockScanInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SBlockDistInfo* pBlockScanInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; - int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, - (int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo)); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } + STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; + int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, + (int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } - tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); - blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle); + tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); + blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle); - SSDataBlock* pBlock = pBlockScanInfo->pResBlock; + SSDataBlock* pBlock = pBlockScanInfo->pResBlock; - int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId; - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId); + int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId; + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId); - int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); - char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); - tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); - varDataSetLen(p, len); + int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); + char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); + tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); + varDataSetLen(p, len); - colDataAppend(pColInfo, 0, p, false); - taosMemoryFree(p); + colDataAppend(pColInfo, 0, p, false); + taosMemoryFree(p); - pBlock->info.rows = 1; - pOperator->status = OP_EXEC_DONE; - return pBlock; + pBlock->info.rows = 1; + pOperator->status = OP_EXEC_DONE; + return pBlock; } static void destroyBlockDistScanOperatorInfo(void* param) { diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index ba826a23d2..ec02633563 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1092,7 +1092,7 @@ static bool checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t group } static void buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock) { - uint64_t groupId = pBlock->info.groupId; + uint64_t groupId = pBlock->info.id.groupId; if (pFillSup->hasDelete && !checkResult(pFillSup, ts, groupId)) { return; } @@ -1131,7 +1131,7 @@ static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo* static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) { while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { - uint64_t groupId = pBlock->info.groupId; + uint64_t groupId = pBlock->info.id.groupId; SWinKey key = {.groupId = groupId, .ts = pFillInfo->current}; if (pFillSup->hasDelete && !checkResult(pFillSup, pFillInfo->current, groupId)) { pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, @@ -1230,7 +1230,7 @@ void keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol, int32_t rowId, SSDataBlock* pRes) { - uint64_t groupId = pBlock->info.groupId; + uint64_t groupId = pBlock->info.id.groupId; getWindowFromDiscBuf(pOperator, tsCol[rowId], groupId, pFillSup); if (pFillSup->prev.key == pFillInfo->preRowKey) { resetFillWindow(&pFillSup->prev); @@ -1245,9 +1245,9 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { SStreamFillSupporter* pFillSup = pInfo->pFillSup; SStreamFillInfo* pFillInfo = pInfo->pFillInfo; SSDataBlock* pBlock = pInfo->pSrcBlock; - uint64_t groupId = pBlock->info.groupId; + uint64_t groupId = pBlock->info.id.groupId; SSDataBlock* pRes = pInfo->pRes; - pRes->info.groupId = groupId; + pRes->info.id.groupId = groupId; if (hasRemainCalc(pFillInfo)) { doStreamFillRange(pFillInfo, pFillSup, pRes); } @@ -1342,14 +1342,14 @@ static void doDeleteFillFinalize(SOperatorInfo* pOperator) { tSimpleHashClear(pInfo->pFillSup->pResMap); for (; pFillInfo->delIndex < size; pFillInfo->delIndex++) { STimeRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex); - if (pInfo->pRes->info.groupId != 0 && pInfo->pRes->info.groupId != range->groupId) { + if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) { return; } getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup); setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo); if (pInfo->pFillInfo->needFill) { doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); - pInfo->pRes->info.groupId = range->groupId; + pInfo->pRes->info.id.groupId = range->groupId; } SWinKey key = {.ts = range->skey, .groupId = range->groupId}; streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key); @@ -1435,7 +1435,7 @@ static void doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock pSup = &pInfo->pFillSup->notFillExprSup; setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false); projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL); - pDstBlock->info.groupId = pSrcBlock->info.groupId; + pDstBlock->info.id.groupId = pSrcBlock->info.id.groupId; blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0e0ec5b339..a6a477a9e3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -655,7 +655,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0); int64_t prevTs = *(int64_t*)pTsKey->pData; - if (groupId == pBlock->info.groupId) { + if (groupId == pBlock->info.id.groupId) { doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP, pSup); } @@ -927,7 +927,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t startPos = 0; int32_t numOfOutput = pSup->numOfExprs; int64_t* tsCols = extractTsCol(pBlock, pInfo); - uint64_t tableGroupId = pBlock->info.groupId; + uint64_t tableGroupId = pBlock->info.id.groupId; bool ascScan = (pInfo->inputOrder == TSDB_ORDER_ASC); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; @@ -1112,7 +1112,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI SExprSupp* pSup = &pOperator->exprSupp; SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId); - int64_t gid = pBlock->info.groupId; + int64_t gid = pBlock->info.id.groupId; bool masterScan = true; int32_t numOfOutput = pOperator->exprSupp.numOfExprs; @@ -1829,7 +1829,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator bool masterScan = true; int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - int64_t gid = pBlock->info.groupId; + int64_t gid = pBlock->info.id.groupId; int64_t gap = pInfo->gap; @@ -2333,7 +2333,7 @@ void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* } // clear the existed group id - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; buildDataBlockFromGroupRes(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); } @@ -2573,7 +2573,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); } setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap); if (IS_FINAL_OP(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2591,7 +2591,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.groupId, NULL); + doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.id.groupId, NULL); } maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.watermark); @@ -3086,7 +3086,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - uint64_t groupId = pSDataBlock->info.groupId; + uint64_t groupId = pSDataBlock->info.id.groupId; int64_t code = TSDB_CODE_SUCCESS; SResultRow* pResult = NULL; int32_t rows = pSDataBlock->info.rows; @@ -3377,7 +3377,7 @@ void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroup } // clear the existed group id - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); } @@ -3854,7 +3854,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamStateAggOperatorInfo* pInfo = pOperator->info; int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - int64_t groupId = pSDataBlock->info.groupId; + int64_t groupId = pSDataBlock->info.id.groupId; int64_t code = TSDB_CODE_SUCCESS; TSKEY* tsCols = NULL; SResultRow* pResult = NULL; @@ -4178,7 +4178,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) { - pRes->info.groupId = pMiaInfo->groupId; + pRes->info.id.groupId = pMiaInfo->groupId; pMiaInfo->curTs = INT64_MIN; pMiaInfo->groupId = 0; } @@ -4203,7 +4203,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { pBlock = pMiaInfo->prefetchedBlock; pMiaInfo->prefetchedBlock = NULL; - pMiaInfo->groupId = pBlock->info.groupId; + pMiaInfo->groupId = pBlock->info.id.groupId; } // no data exists, all query processing is done @@ -4220,12 +4220,12 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { } if (pMiaInfo->groupId == 0) { - if (pMiaInfo->groupId != pBlock->info.groupId) { - pMiaInfo->groupId = pBlock->info.groupId; - pRes->info.groupId = pMiaInfo->groupId; + if (pMiaInfo->groupId != pBlock->info.id.groupId) { + pMiaInfo->groupId = pBlock->info.id.groupId; + pRes->info.id.groupId = pMiaInfo->groupId; } } else { - if (pMiaInfo->groupId != pBlock->info.groupId) { + if (pMiaInfo->groupId != pBlock->info.id.groupId) { // if there are unclosed time window, close it firstly. ASSERT(pMiaInfo->curTs != INT64_MIN); finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo); @@ -4236,7 +4236,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { break; } else { // continue - pRes->info.groupId = pMiaInfo->groupId; + pRes->info.id.groupId = pMiaInfo->groupId; } } @@ -4443,7 +4443,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* int32_t startPos = 0; int32_t numOfOutput = pExprSup->numOfExprs; int64_t* tsCols = extractTsCol(pBlock, iaInfo); - uint64_t tableGroupId = pBlock->info.groupId; + uint64_t tableGroupId = pBlock->info.id.groupId; bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; @@ -4549,7 +4549,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { pBlock = downstream->fpSet.getNextFn(downstream); } else { pBlock = miaInfo->prefetchedBlock; - miaInfo->groupId = pBlock->info.groupId; + miaInfo->groupId = pBlock->info.id.groupId; miaInfo->prefetchedBlock = NULL; } @@ -4561,8 +4561,8 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { if (!miaInfo->hasGroupId) { miaInfo->hasGroupId = true; - miaInfo->groupId = pBlock->info.groupId; - } else if (miaInfo->groupId != pBlock->info.groupId) { + miaInfo->groupId = pBlock->info.id.groupId; + } else if (miaInfo->groupId != pBlock->info.id.groupId) { miaInfo->prefetchedBlock = pBlock; break; } @@ -4576,7 +4576,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { } } - pRes->info.groupId = miaInfo->groupId; + pRes->info.id.groupId = miaInfo->groupId; } if (miaInfo->inputBlocksFinished) { @@ -4585,7 +4585,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { if (listNode != NULL) { SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data); // finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes); - pRes->info.groupId = grpWin->groupId; + pRes->info.id.groupId = grpWin->groupId; } } @@ -4744,7 +4744,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); minTs = TMIN(minTs, pBlock->info.window.skey); - doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap); } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index e0a0b9442e..3f91142708 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -417,8 +417,8 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { SSDataBlock* pRightBlock = pRightSource->src.pBlock; if (pParam->cmpGroupId) { - if (pLeftBlock->info.groupId != pRightBlock->info.groupId) { - return pLeftBlock->info.groupId < pRightBlock->info.groupId ? -1 : 1; + if (pLeftBlock->info.id.groupId != pRightBlock->info.id.groupId) { + return pLeftBlock->info.id.groupId < pRightBlock->info.id.groupId ? -1 : 1; } } @@ -826,7 +826,7 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) { } } -uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.groupId; } +uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.groupId; } SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) { SSortExecInfo info = {0}; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 8715aa0be1..c78ec5b999 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1096,7 +1096,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { SSDataBlock *pTempBlock = createDataBlock(); pTempBlock->info.rows = pInput->totalRows; - pTempBlock->info.uid = pInput->uid; + pTempBlock->info.id.uid = pInput->uid; for (int32_t i = 0; i < numOfCols; ++i) { blockDataAppendColInfo(pTempBlock, pInput->pData[i]); } diff --git a/source/libs/index/test/index_executor_tests.cpp b/source/libs/index/test/index_executor_tests.cpp index bcc474dc8b..c8a7ca98f0 100644 --- a/source/libs/index/test/index_executor_tests.cpp +++ b/source/libs/index/test/index_executor_tests.cpp @@ -78,7 +78,7 @@ void sifAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *sl blockDataEnsureCapacity(res, rows); *dataBlockId = taosArrayGetSize(pBlockList) - 1; - res->info.blockId = *dataBlockId; + res->info.id.blockId = *dataBlockId; *slotId = 0; } else { SSDataBlock *res = *(SSDataBlock **)taosArrayGetLast(pBlockList); diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index d1271e9290..55f33b7a3e 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -378,7 +378,7 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t int32_t index = -1; for (int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) { SSDataBlock *pb = taosArrayGetP(ctx->pBlockList, i); - if (pb->info.blockId == ref->dataBlockId) { + if (pb->info.id.blockId == ref->dataBlockId) { index = i; break; } @@ -1384,7 +1384,7 @@ EDealRes sclWalkTarget(SNode *pNode, SScalarCtx *ctx) { int32_t index = -1; for (int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) { SSDataBlock *pb = taosArrayGetP(ctx->pBlockList, i); - if (pb->info.blockId == target->dataBlockId) { + if (pb->info.id.blockId == target->dataBlockId) { index = i; break; } diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index 0fd0c98c1a..dae26d3d58 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -98,7 +98,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s taosArrayPush(pBlockList, &res); *dataBlockId = taosArrayGetSize(pBlockList) - 1; - res->info.blockId = *dataBlockId; + res->info.id.blockId = *dataBlockId; *slotId = 0; } else { SSDataBlock *res = *(SSDataBlock **)taosArrayGetLast(pBlockList); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 573a1ea31f..2c36c299ee 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -463,7 +463,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat continue; } - if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.groupId) < 0) { + if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.id.groupId) < 0) { goto FAIL_SHUFFLE_DISPATCH; } } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 15526cd8bb..1589fddda4 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -166,7 +166,7 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) { TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) { if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN; TSKEY maxTs = INT64_MIN; - int64_t tbUid = pBlock->info.uid; + int64_t tbUid = pBlock->info.id.uid; SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol); diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 7264e0b5ff..ea67986da2 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -210,6 +210,7 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) { ret = tdbGetFileSize(pPager->fd, pPager->pageSize, &(pPager->dbOrigSize)); pPager->dbFileSize = pPager->dbOrigSize; + tdbTrace("pager/open reset dirty tree: %p", &pPager->rbt); tRBTreeCreate(&pPager->rbt, pageCmpFn); *ppPager = pPager; @@ -296,7 +297,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { // ref page one more time so the page will not be release tdbRefPage(pPage); - tdbDebug("pcache/mdirty page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id); + tdbDebug("pager/mdirty page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id); // Set page as dirty pPage->isDirty = 1; @@ -316,6 +317,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { pPage->pDirtyNext = *ppPage; *ppPage = pPage; */ + tdbTrace("put page: %p %d to dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt); tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage); // Write page to journal if neccessary @@ -373,6 +375,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { SRBTreeNode *pNode = NULL; while ((pNode = tRBTreeIterNext(&iter)) != NULL) { pPage = (SPage *)pNode; + ASSERT(pPage->nOverflow == 0); ret = tdbPagerWritePageToDB(pPager, pPage); if (ret < 0) { @@ -398,6 +401,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { tdbPCacheRelease(pPager->pCache, pPage, pTxn); } + tdbTrace("pager/commit reset dirty tree: %p", &pPager->rbt); tRBTreeCreate(&pPager->rbt, pageCmpFn); // sync the db file @@ -471,6 +475,7 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { tdbPCacheRelease(pPager->pCache, pPage, pTxn); } /* + tdbTrace("reset dirty tree: %p", &pPager->rbt); tRBTreeCreate(&pPager->rbt, pageCmpFn); // sync the db file @@ -566,6 +571,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { tdbPCacheRelease(pPager->pCache, pPage, pTxn); } + tdbTrace("reset dirty tree: %p", &pPager->rbt); tRBTreeCreate(&pPager->rbt, pageCmpFn); // 4, remove the journal file @@ -580,6 +586,8 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) { SPage *pPage; + i32 nRef; + SPgno maxPgno = pPager->dbOrigSize; int ret; // loop to write the dirty pages to file @@ -587,29 +595,52 @@ int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) { SRBTreeNode *pNode = NULL; while ((pNode = tRBTreeIterNext(&iter)) != NULL) { pPage = (SPage *)pNode; + nRef = tdbGetPageRef(pPage); + if (nRef > 1) { + continue; + } + + SPgno pgno = TDB_PAGE_PGNO(pPage); + if (pgno > maxPgno) { + maxPgno = pgno; + } ret = tdbPagerWritePageToDB(pPager, pPage); if (ret < 0) { tdbError("failed to write page to db since %s", tstrerror(terrno)); return -1; } - } - tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize); - pPager->dbOrigSize = pPager->dbFileSize; + tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno); + pPager->dbOrigSize = maxPgno; + + pPage->isDirty = 0; + + tdbTrace("pager/flush drop page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt); + tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); + tdbPCacheRelease(pPager->pCache, pPage, pTxn); + + break; + } + /* + tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno); + pPager->dbOrigSize = maxPgno; // release the page iter = tRBTreeIterCreate(&pPager->rbt, 1); while ((pNode = tRBTreeIterNext(&iter)) != NULL) { pPage = (SPage *)pNode; + nRef = tdbGetPageRef(pPage); + if (nRef > 1) { + continue; + } pPage->isDirty = 0; + tdbTrace("pager/flush drop page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt); tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tdbPCacheRelease(pPager->pCache, pPage, pTxn); } - - tRBTreeCreate(&pPager->rbt, pageCmpFn); - + */ return 0; } diff --git a/source/util/src/trbtree.c b/source/util/src/trbtree.c index cbbf53d8f1..c6aea87470 100644 --- a/source/util/src/trbtree.c +++ b/source/util/src/trbtree.c @@ -104,61 +104,6 @@ static void tRBTreeTransplant(SRBTree *pTree, SRBTreeNode *u, SRBTreeNode *v) { v->parent = u->parent; } -static void tRBTreeDropFix(SRBTree *pTree, SRBTreeNode *x) { - while (x != pTree->root && x->color == BLACK) { - if (x == x->parent->left) { - SRBTreeNode *w = x->parent->right; - if (w->color == RED) { - w->color = BLACK; - x->parent->color = RED; - tRBTreeRotateLeft(pTree, x->parent); - w = x->parent->right; - } - if (w->left->color == BLACK && w->right->color == BLACK) { - w->color = RED; - x = x->parent; - } else { - if (w->right->color == BLACK) { - w->left->color = BLACK; - w->color = RED; - tRBTreeRotateRight(pTree, w); - w = x->parent->right; - } - w->color = x->parent->color; - x->parent->color = BLACK; - w->right->color = BLACK; - tRBTreeRotateLeft(pTree, x->parent); - x = pTree->root; - } - } else { - SRBTreeNode *w = x->parent->left; - if (w->color == RED) { - w->color = BLACK; - x->parent->color = RED; - tRBTreeRotateRight(pTree, x->parent); - w = x->parent->left; - } - if (w->right->color == BLACK && w->left->color == BLACK) { - w->color = RED; - x = x->parent; - } else { - if (w->left->color == BLACK) { - w->right->color = BLACK; - w->color = RED; - tRBTreeRotateLeft(pTree, w); - w = x->parent->left; - } - w->color = x->parent->color; - x->parent->color = BLACK; - w->left->color = BLACK; - tRBTreeRotateRight(pTree, x->parent); - x = pTree->root; - } - } - } - x->color = BLACK; -} - static SRBTreeNode *tRBTreeSuccessor(SRBTree *pTree, SRBTreeNode *pNode) { if (pNode->right != pTree->NIL) { pNode = pNode->right; @@ -255,11 +200,205 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) { return z; } -void tRBTreeDrop(SRBTree *pTree, SRBTreeNode *z) { - SRBTreeNode *y = z; - SRBTreeNode *x; - ECOLOR y_orignal_color = y->color; +#define RBTREE_NULL rbtree->NIL +#define rbtree_t SRBTree +#define rbnode_t SRBTreeNode +#define rbtree_rotate_left tRBTreeRotateLeft +#define rbtree_rotate_right tRBTreeRotateRight +static void rbtree_delete_fixup(rbtree_t *rbtree, rbnode_t *child, rbnode_t *child_parent) { + rbnode_t *sibling; + int go_up = 1; + + /* determine sibling to the node that is one-black short */ + if (child_parent->right == child) + sibling = child_parent->left; + else + sibling = child_parent->right; + + while (go_up) { + if (child_parent == RBTREE_NULL) { + /* removed parent==black from root, every path, so ok */ + return; + } + + if (sibling->color == RED) { /* rotate to get a black sibling */ + child_parent->color = RED; + sibling->color = BLACK; + if (child_parent->right == child) + rbtree_rotate_right(rbtree, child_parent); + else + rbtree_rotate_left(rbtree, child_parent); + /* new sibling after rotation */ + if (child_parent->right == child) + sibling = child_parent->left; + else + sibling = child_parent->right; + } + + if (child_parent->color == BLACK && sibling->color == BLACK && sibling->left->color == BLACK && + sibling->right->color == BLACK) { /* fixup local with recolor of sibling */ + if (sibling != RBTREE_NULL) sibling->color = RED; + + child = child_parent; + child_parent = child_parent->parent; + /* prepare to go up, new sibling */ + if (child_parent->right == child) + sibling = child_parent->left; + else + sibling = child_parent->right; + } else + go_up = 0; + } + + if (child_parent->color == RED && sibling->color == BLACK && sibling->left->color == BLACK && + sibling->right->color == BLACK) { + /* move red to sibling to rebalance */ + if (sibling != RBTREE_NULL) sibling->color = RED; + child_parent->color = BLACK; + return; + } + assert(sibling != RBTREE_NULL); + + /* get a new sibling, by rotating at sibling. See which child + of sibling is red */ + if (child_parent->right == child && sibling->color == BLACK && sibling->right->color == RED && + sibling->left->color == BLACK) { + sibling->color = RED; + sibling->right->color = BLACK; + rbtree_rotate_left(rbtree, sibling); + /* new sibling after rotation */ + if (child_parent->right == child) + sibling = child_parent->left; + else + sibling = child_parent->right; + } else if (child_parent->left == child && sibling->color == BLACK && sibling->left->color == RED && + sibling->right->color == BLACK) { + sibling->color = RED; + sibling->left->color = BLACK; + rbtree_rotate_right(rbtree, sibling); + /* new sibling after rotation */ + if (child_parent->right == child) + sibling = child_parent->left; + else + sibling = child_parent->right; + } + + /* now we have a black sibling with a red child. rotate and exchange colors. */ + sibling->color = child_parent->color; + child_parent->color = BLACK; + if (child_parent->right == child) { + assert(sibling->left->color == RED); + sibling->left->color = BLACK; + rbtree_rotate_right(rbtree, child_parent); + } else { + assert(sibling->right->color == RED); + sibling->right->color = BLACK; + rbtree_rotate_left(rbtree, child_parent); + } +} + +/** helpers for delete: swap node colours */ +static void swap_int8(ECOLOR *x, ECOLOR *y) { + ECOLOR t = *x; + *x = *y; + *y = t; +} + +/** helpers for delete: swap node pointers */ +static void swap_np(rbnode_t **x, rbnode_t **y) { + rbnode_t *t = *x; + *x = *y; + *y = t; +} + +/** Update parent pointers of child trees of 'parent' */ +static void change_parent_ptr(rbtree_t *rbtree, rbnode_t *parent, rbnode_t *old, rbnode_t *new) { + if (parent == RBTREE_NULL) { + assert(rbtree->root == old); + if (rbtree->root == old) rbtree->root = new; + return; + } + assert(parent->left == old || parent->right == old || parent->left == new || parent->right == new); + if (parent->left == old) parent->left = new; + if (parent->right == old) parent->right = new; +} +/** Update parent pointer of a node 'child' */ +static void change_child_ptr(rbtree_t *rbtree, rbnode_t *child, rbnode_t *old, rbnode_t *new) { + if (child == RBTREE_NULL) return; + assert(child->parent == old || child->parent == new); + if (child->parent == old) child->parent = new; +} + +rbnode_t *rbtree_delete(rbtree_t *rbtree, void *key) { + rbnode_t *to_delete = key; + rbnode_t *child; + + /* make sure we have at most one non-leaf child */ + if (to_delete->left != RBTREE_NULL && to_delete->right != RBTREE_NULL) { + /* swap with smallest from right subtree (or largest from left) */ + rbnode_t *smright = to_delete->right; + while (smright->left != RBTREE_NULL) smright = smright->left; + /* swap the smright and to_delete elements in the tree, + * but the rbnode_t is first part of user data struct + * so cannot just swap the keys and data pointers. Instead + * readjust the pointers left,right,parent */ + + /* swap colors - colors are tied to the position in the tree */ + swap_int8(&to_delete->color, &smright->color); + + /* swap child pointers in parents of smright/to_delete */ + change_parent_ptr(rbtree, to_delete->parent, to_delete, smright); + if (to_delete->right != smright) change_parent_ptr(rbtree, smright->parent, smright, to_delete); + + /* swap parent pointers in children of smright/to_delete */ + change_child_ptr(rbtree, smright->left, smright, to_delete); + change_child_ptr(rbtree, smright->left, smright, to_delete); + change_child_ptr(rbtree, smright->right, smright, to_delete); + change_child_ptr(rbtree, smright->right, smright, to_delete); + change_child_ptr(rbtree, to_delete->left, to_delete, smright); + if (to_delete->right != smright) change_child_ptr(rbtree, to_delete->right, to_delete, smright); + if (to_delete->right == smright) { + /* set up so after swap they work */ + to_delete->right = to_delete; + smright->parent = smright; + } + + /* swap pointers in to_delete/smright nodes */ + swap_np(&to_delete->parent, &smright->parent); + swap_np(&to_delete->left, &smright->left); + swap_np(&to_delete->right, &smright->right); + + /* now delete to_delete (which is at the location where the smright previously was) */ + } + assert(to_delete->left == RBTREE_NULL || to_delete->right == RBTREE_NULL); + + if (to_delete->left != RBTREE_NULL) + child = to_delete->left; + else + child = to_delete->right; + + /* unlink to_delete from the tree, replace to_delete with child */ + change_parent_ptr(rbtree, to_delete->parent, to_delete, child); + change_child_ptr(rbtree, child, to_delete, to_delete->parent); + + if (to_delete->color == RED) { + /* if node is red then the child (black) can be swapped in */ + } else if (child->color == RED) { + /* change child to BLACK, removing a RED node is no problem */ + if (child != RBTREE_NULL) child->color = BLACK; + } else + rbtree_delete_fixup(rbtree, child, to_delete->parent); + + /* unlink completely */ + to_delete->parent = RBTREE_NULL; + to_delete->left = RBTREE_NULL; + to_delete->right = RBTREE_NULL; + to_delete->color = BLACK; + return to_delete; +} + +void tRBTreeDrop(SRBTree *pTree, SRBTreeNode *z) { // update min/max node if (pTree->min == z) { pTree->min = tRBTreeSuccessor(pTree, pTree->min); @@ -268,34 +407,8 @@ void tRBTreeDrop(SRBTree *pTree, SRBTreeNode *z) { pTree->max = tRBTreePredecessor(pTree, pTree->max); } - // drop impl - if (z->left == pTree->NIL) { - x = z->right; - tRBTreeTransplant(pTree, z, z->right); - } else if (z->right == pTree->NIL) { - x = z->left; - tRBTreeTransplant(pTree, z, z->left); - } else { - y = tRBTreeSuccessor(pTree, z); - y_orignal_color = y->color; - x = y->right; - if (y->parent == z) { - x->parent = z; - } else { - tRBTreeTransplant(pTree, y, y->right); - y->right = z->right; - y->right->parent = y; - } - tRBTreeTransplant(pTree, z, y); - y->left = z->left; - y->left->parent = y; - y->color = z->color; - } + rbtree_delete(pTree, z); - // fix - if (y_orignal_color == BLACK) { - tRBTreeDropFix(pTree, x); - } pTree->n--; } @@ -343,4 +456,4 @@ SRBTreeNode *tRBTreeIterNext(SRBTreeIter *pIter) { _exit: return (pNode == pIter->pTree->NIL) ? NULL : pNode; -} \ No newline at end of file +} diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 580a298588..0abec6b340 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -409,7 +409,7 @@ ,,n,system-test,python3 ./test.py -f 0-others/taosdShell.py -N 5 -M 3 -Q 3 ,,n,system-test,python3 ./test.py -f 0-others/udfTest.py ,,n,system-test,python3 ./test.py -f 0-others/udf_create.py -,,n,system-test,python3 ./test.py -f 0-others/udf_restart_taosd.py +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/udf_restart_taosd.py ,,n,system-test,python3 ./test.py -f 0-others/udf_cfg1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/udf_cfg2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/cachemodel.py @@ -425,7 +425,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_table.py -,,n,system-test,python3 ./test.py -f 1-insert/boundary.py +,,,system-test,python3 ./test.py -f 1-insert/boundary.py ,,n,system-test,python3 ./test.py -f 1-insert/insertWithMoreVgroup.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_comment.py ,,n,system-test,python3 ./test.py -f 1-insert/time_range_wise.py