diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 9c1f2063a7..77f1879b81 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -174,15 +174,28 @@ typedef struct SColumnDataAgg { } SColumnDataAgg; #pragma pack(pop) +typedef struct SBlockID { + // The uid of table, from which current data block comes. And it is always 0, if current block is the + // result of calculation. + uint64_t uid; + + // Block id, acquired and assigned from executor, which created according to the hysical planner. Block id is used + // to mark the stage of exec task. + uint64_t blockId; + + // Generated by group/partition by [value|tags]. Created and assigned by table-scan operator, group-by operator, + // and partition by operator. + uint64_t groupId; +} SBlockID; + typedef struct SDataBlockInfo { STimeWindow window; - int32_t rows; // todo hide this attribute int32_t rowSize; - uint64_t uid; // the uid of table, from which current data block comes - uint16_t blockId; // block id, generated by physical planner - uint64_t groupId; - int16_t hasVarCol; + int32_t rows; // todo hide this attribute uint32_t capacity; + SBlockID id; + int16_t hasVarCol; + // TODO: optimize and remove following int64_t version; // used for stream, and need serialization int32_t childId; // used for stream, do not serialize @@ -190,8 +203,8 @@ typedef struct SDataBlockInfo { STimeWindow calWin; // used for stream, do not serialize TSKEY watermark; // used for stream - char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition - STag* pTag; // used for stream partition + char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition + STag* pTag; // used for stream partition } SDataBlockInfo; typedef struct SSDataBlock { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index cfa2964e16..a0f795a729 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -621,7 +621,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { // todo remove this int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) { pBlock->info.rows = *(int32_t*)buf; - pBlock->info.groupId = *(uint64_t*)(buf + sizeof(int32_t)); + pBlock->info.id.groupId = *(uint64_t*)(buf + sizeof(int32_t)); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -1140,7 +1140,8 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { SDataBlockInfo* pInfo = &pDataBlock->info; pInfo->rows = 0; - pInfo->groupId = 0; + pInfo->id.uid = 0; + pInfo->id.groupId = 0; pInfo->window.ekey = 0; pInfo->window.skey = 0; @@ -1334,7 +1335,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) { SSDataBlock* createSpecialDataBlock(EStreamType type) { SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); pBlock->info.hasVarCol = false; - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; pBlock->info.rows = 0; pBlock->info.type = type; pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + @@ -1675,7 +1676,7 @@ int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) { } int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { - int64_t tbUid = pBlock->info.uid; + int64_t tbUid = pBlock->info.id.uid; int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int16_t hasVarCol = pBlock->info.hasVarCol; int32_t rows = pBlock->info.rows; @@ -1713,7 +1714,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - buf = taosDecodeFixedU64(buf, &pBlock->info.uid); + buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid); buf = taosDecodeFixedI16(buf, &numOfCols); buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol); buf = taosDecodeFixedI32(buf, &pBlock->info.rows); @@ -1834,7 +1835,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { int32_t rows = pDataBlock->info.rows; printf("%s |block ver %" PRIi64 " |block type %d |child id %d|group id %" PRIu64 "\n", flag, pDataBlock->info.version, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, - pDataBlock->info.groupId); + pDataBlock->info.id.groupId); for (int32_t j = 0; j < rows; j++) { printf("%s |", flag); for (int32_t k = 0; k < numOfCols; k++) { @@ -1905,8 +1906,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%d|version:%" PRIu64 "\n", - flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId, - pDataBlock->info.uid, pDataBlock->info.rows, pDataBlock->info.version); + flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, + pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version); if (len >= size - 1) return dumpBuf; for (int32_t j = 0; j < rows; j++) { @@ -2035,8 +2036,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB for (int32_t i = 0; i < sz; ++i) { int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; - // int32_t rowSize = pDataBlock->info.rowSize; - // int64_t groupId = pDataBlock->info.groupId; if (colNum <= 1) { // invalid if only with TS col @@ -2049,7 +2048,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen); pSubmitBlk->suid = suid; - pSubmitBlk->uid = pDataBlock->info.groupId; + pSubmitBlk->uid = pDataBlock->info.id.groupId; pSubmitBlk->numOfRows = rows; pSubmitBlk->sversion = pTSchema->version; @@ -2292,7 +2291,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { } *actualLen = dataLen; - *groupId = pBlock->info.groupId; + *groupId = pBlock->info.id.groupId; ASSERT(dataLen > 0); uDebug("build data block, actualLen:%d, rows:%d, cols:%d", dataLen, *rows, *cols); @@ -2325,7 +2324,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += sizeof(int32_t); // group id sizeof(uint64_t) - pBlock->info.groupId = *(uint64_t*)pStart; + pBlock->info.id.groupId = *(uint64_t*)pStart; pStart += sizeof(uint64_t); if (pBlock->pDataBlock == NULL) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 51842a8ae4..6bd2ae3435 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -708,7 +708,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma #endif for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { SSDataBlock *output = taosArrayGetP(pResList, i); - smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.uid, output->info.groupId, + smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.id.uid, output->info.id.groupId, output->info.rows); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); @@ -718,7 +718,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%" PRIu64 ", level %" PRIi8 " failed since %s", - SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr()); + SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr()); goto _err; } @@ -726,13 +726,13 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma taosMemoryFreeClear(pReq); smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8 " failed since %s", - SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr()); + SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr()); goto _err; } smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64 " len %" PRIu32, - SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version, + SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version, htonl(pReq->header.contLen)); taosMemoryFreeClear(pReq); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index e41b1d8aa8..c3a4cefc66 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -530,7 +530,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter); - pBlock->info.uid = pReader->msgIter.uid; + pBlock->info.id.uid = pReader->msgIter.uid; pBlock->info.rows = pReader->msgIter.numOfRows; pBlock->info.version = pReader->pMsg->version; @@ -649,7 +649,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas } SSDataBlock* pBlock = taosArrayGetLast(blocks); - pBlock->info.uid = pReader->msgIter.uid; + pBlock->info.id.uid = pReader->msgIter.uid; pBlock->info.rows = 0; pBlock->info.version = pReader->pMsg->version; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 30cde5d475..a4bfb6c876 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -103,7 +103,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem // STagVal tagVal = { // .cid = pTagSchemaWrapper->pSchema[j].colId, // .type = pTagSchemaWrapper->pSchema[j].type, - // .i64 = (int64_t)pDataBlock->info.groupId, + // .i64 = (int64_t)pDataBlock->info.id.groupId, // }; // taosArrayPush(tagArray, &tagVal); // taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name); @@ -134,7 +134,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem STagVal tagVal = { .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.groupId, + .i64 = (int64_t)pDataBlock->info.id.groupId, }; taosArrayPush(tagArray, &tagVal); createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); @@ -161,7 +161,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem if (pDataBlock->info.parTbName[0]) { createTbReq.name = strdup(pDataBlock->info.parTbName); } else { - createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); + createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); } // save schema len @@ -358,7 +358,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d if (pDataBlock->info.parTbName[0]) { ctbName = strdup(pDataBlock->info.parTbName); } else { - ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); + ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); } int32_t schemaLen = 0; @@ -390,7 +390,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d STagVal tagVal = { .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.groupId, + .i64 = (int64_t)pDataBlock->info.id.groupId, }; taosArrayPush(tagArray, &tagVal); createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 96cfa1752d..4ba311212a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1624,7 +1624,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader); blockDataUpdateTsWindow(pBlock, 0); - pBlock->info.uid = pBlockScanInfo->uid; + pBlock->info.id.uid = pBlockScanInfo->uid; setComposedBlockFlag(pReader, true); @@ -2494,7 +2494,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } _end: - pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; + pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; blockDataUpdateTsWindow(pResBlock, 0); setComposedBlockFlag(pReader, true); @@ -2506,7 +2506,7 @@ _end: if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", - pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, + pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } @@ -2830,7 +2830,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } else { // whole block is required, return it directly SDataBlockInfo* pInfo = &pReader->pResBlock->info; pInfo->rows = pBlock->nRow; - pInfo->uid = pScanInfo->uid; + pInfo->id.uid = pScanInfo->uid; pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts}; setComposedBlockFlag(pReader, false); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order); @@ -4020,7 +4020,7 @@ bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) { ASSERT(pReader != NULL); *rows = pReader->pResBlock->info.rows; - *uid = pReader->pResBlock->info.uid; + *uid = pReader->pResBlock->info.id.uid; *pWindow = pReader->pResBlock->info.window; } diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 0278a11f80..be101059f2 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -436,7 +436,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { pDataBlock->pBlockAgg = NULL; taosArrayGetSize(pDataBlock->pDataBlock) = tSmaNumOfCols; pDataBlock->info.rows = tSmaNumOfRows; - pDataBlock->info.groupId = tSmaGroupId + g; + pDataBlock->info.id.groupId = tSmaGroupId + g; pDataBlock->pDataBlock = taosArrayInit(tSmaNumOfCols, sizeof(SColumnInfoData *)); EXPECT_NE(pDataBlock->pDataBlock, nullptr); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index cdd744bded..c432f3c01c 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -185,7 +185,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } } - pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); + pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); pRes->info.rows = 1; SExprSupp* pSup = &pInfo->pseudoExprSup; @@ -196,7 +196,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { return NULL; } - pRes->info.groupId = getTableGroupId(pTableList, pRes->info.uid); + pRes->info.id.groupId = getTableGroupId(pTableList, pRes->info.id.uid); pInfo->indexOfBufferedRes += 1; return pRes; } else { @@ -232,12 +232,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SExprSupp* pSup = &pInfo->pseudoExprSup; STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0]; - pInfo->pRes->info.groupId = pKeyInfo->groupId; + pInfo->pRes->info.id.groupId = pKeyInfo->groupId; if (taosArrayGetSize(pInfo->pUidList) > 0) { ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); - pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); + pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 280880c077..b2ddff45a4 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -717,10 +717,10 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) { if (pLimitInfo->remainGroupOffset > 0) { if (pLimitInfo->currentGroupId == 0) { // it is the first group - pLimitInfo->currentGroupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.id.groupId; blockDataCleanup(pBlock); return PROJECT_RETRIEVE_CONTINUE; - } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) { + } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) { // now it is the data from a new group pLimitInfo->remainGroupOffset -= 1; @@ -732,11 +732,11 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa } // set current group id of the project operator - pLimitInfo->currentGroupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.id.groupId; } // here check for a new group data, we need to handle the data of the previous group. - if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) { + if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) { pLimitInfo->numOfOutputGroups += 1; if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { pOperator->status = OP_EXEC_DONE; @@ -758,7 +758,7 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa // here we reach the start position, according to the limit/offset requirements. // set current group id - pLimitInfo->currentGroupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.id.groupId; if (pLimitInfo->remainOffset >= pBlock->info.rows) { pLimitInfo->remainOffset -= pBlock->info.rows; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 08e6e4792b..b135566caa 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -213,7 +213,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { SSDataBlock* pBlock = createDataBlock(); - pBlock->info.blockId = pNode->dataBlockId; + pBlock->info.id.blockId = pNode->dataBlockId; pBlock->info.type = STREAM_INVALID; pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; pBlock->info.watermark = INT64_MIN; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5abde1be85..514236159d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -447,7 +447,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int pCtx[i].scanFlag = scanFlag; SInputColumnInfoData* pInput = &pCtx[i].input; - pInput->uid = pBlock->info.uid; + pInput->uid = pBlock->info.id.uid; pInput->colDataSMAIsSet = false; SExprInfo* pOneExpr = &pExprSup->pExprInfo[i]; @@ -506,184 +506,6 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { - size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0; - for (int32_t i = 0; i < num; ++i) { - pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i); - } -} - -int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, - int32_t numOfOutput, SArray* pPseudoList) { - setPseudoOutputColInfo(pResult, pCtx, pPseudoList); - - if (pSrcBlock == NULL) { - for (int32_t k = 0; k < numOfOutput; ++k) { - int32_t outputSlotId = pExpr[k].base.resSchema.slotId; - - ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE); - SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - - int32_t type = pExpr[k].base.pParam[0].param.nType; - if (TSDB_DATA_TYPE_NULL == type) { - colDataAppendNNULL(pColInfoData, 0, 1); - } else { - colDataAppend(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false); - } - } - - pResult->info.rows = 1; - return TSDB_CODE_SUCCESS; - } - - if (pResult != pSrcBlock) { - pResult->info.groupId = pSrcBlock->info.groupId; - memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); - } - - // if the source equals to the destination, it is to create a new column as the result of scalar - // function or some operators. - bool createNewColModel = (pResult == pSrcBlock); - if (createNewColModel) { - blockDataEnsureCapacity(pResult, pResult->info.rows); - } - - int32_t numOfRows = 0; - - for (int32_t k = 0; k < numOfOutput; ++k) { - int32_t outputSlotId = pExpr[k].base.resSchema.slotId; - SqlFunctionCtx* pfCtx = &pCtx[k]; - SInputColumnInfoData* pInputData = &pfCtx->input; - - if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query - SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - if (pResult->info.rows > 0 && !createNewColModel) { - colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0], - pInputData->numOfRows); - } else { - colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info); - } - - numOfRows = pInputData->numOfRows; - } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { - SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - - int32_t offset = createNewColModel ? 0 : pResult->info.rows; - - int32_t type = pExpr[k].base.pParam[0].param.nType; - if (TSDB_DATA_TYPE_NULL == type) { - colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows); - } else { - for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { - colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false); - } - } - - numOfRows = pSrcBlock->info.rows; - } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) { - SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); - taosArrayPush(pBlockList, &pSrcBlock); - - SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); - SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; - - SScalarParam dest = {.columnData = &idata}; - int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pBlockList); - return code; - } - - int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; - ASSERT(pResult->info.capacity > 0); - - colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows); - colDataDestroy(&idata); - - numOfRows = dest.numOfRows; - taosArrayDestroy(pBlockList); - } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { - // _rowts/_c0, not tbname column - if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) { - // do nothing - } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) { - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx); - pfCtx->fpSet.init(pfCtx, pResInfo); - - pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); - pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset - - // set the timestamp(_rowts) output buffer - if (taosArrayGetSize(pPseudoList) > 0) { - int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); - pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; - } - - // link pDstBlock to set selectivity value - if (pfCtx->subsidiaries.num > 0) { - pfCtx->pDstBlock = pResult; - } - - int32_t code = pfCtx->fpSet.process(pfCtx); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - numOfRows = pResInfo->numOfRes; - } else if (fmIsAggFunc(pfCtx->functionId)) { - // selective value output should be set during corresponding function execution - if (fmIsSelectValueFunc(pfCtx->functionId)) { - continue; - } - // _group_key function for "partition by tbname" + csum(col_name) query - SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); - int32_t slotId = pfCtx->param[0].pCol->slotId; - - // todo handle the json tag - SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId); - for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) { - bool isNull = colDataIsNull_s(pInput, f); - if (isNull) { - colDataAppendNULL(pOutput, pResult->info.rows + f); - } else { - char* data = colDataGetData(pInput, f); - colDataAppend(pOutput, pResult->info.rows + f, data, isNull); - } - } - - } else { - SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); - taosArrayPush(pBlockList, &pSrcBlock); - - SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); - SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; - - SScalarParam dest = {.columnData = &idata}; - int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pBlockList); - return code; - } - - int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; - ASSERT(pResult->info.capacity > 0); - colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows); - colDataDestroy(&idata); - - numOfRows = dest.numOfRows; - taosArrayDestroy(pBlockList); - } - } else { - return TSDB_CODE_OPS_NOT_SUPPORT; - } - } - - if (!createNewColModel) { - pResult->info.rows += numOfRows; - } - - return TSDB_CODE_SUCCESS; -} - bool functionNeedToExecute(SqlFunctionCtx* pCtx) { struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -862,7 +684,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowEntryInfoOffset); } else { - if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId, + if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -913,7 +735,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); - if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId, + if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1245,11 +1067,11 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS continue; } - if (pBlock->info.groupId == 0) { - pBlock->info.groupId = pPos->groupId; + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = pPos->groupId; } else { // current value belongs to different group, it can't be packed into one datablock - if (pBlock->info.groupId != pPos->groupId) { + if (pBlock->info.id.groupId != pPos->groupId) { releaseBufPage(pBuf, page); break; } @@ -1269,7 +1091,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS } qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, - pBlock->info.groupId); + pBlock->info.id.groupId); blockDataUpdateTsWindow(pBlock, 0); return 0; @@ -1289,12 +1111,12 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr } // clear the existed group id - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; ASSERT(!pbInfo->mergeResultBlock); doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); void* tbname = NULL; - if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { + if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -1316,7 +1138,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG } // clear the existed group id - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; if (!pbInfo->mergeResultBlock) { doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); } else { @@ -1327,11 +1149,11 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG } // clearing group id to continue to merge data that belong to different groups - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; } // clear the group id info in SSDataBlock, since the client does not need it - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; } } @@ -1635,7 +1457,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId); + setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); setInputDataBlock(pSup, pBlock, order, scanFlag, true); code = doAggregateImpl(pOperator, pSup->pCtx); if (code != 0) { @@ -1712,7 +1534,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows; taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows); - pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId; + pInfo->curGroupId = pInfo->existNewGroupBlock->info.id.groupId; pInfo->existNewGroupBlock = NULL; } @@ -1721,7 +1543,7 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera if (taosFillHasMoreResults(pInfo->pFillInfo)) { int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows; taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows); - pInfo->pRes->info.groupId = pInfo->curGroupId; + pInfo->pRes->info.id.groupId = pInfo->curGroupId; return; } @@ -1743,7 +1565,7 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlo setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false); projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs, NULL); - pInfo->pRes->info.groupId = pBlock->info.groupId; + pInfo->pRes->info.id.groupId = pBlock->info.id.groupId; } static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { @@ -1761,7 +1583,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows > 0) { - pResBlock->info.groupId = pInfo->curGroupId; + pResBlock->info.id.groupId = pInfo->curGroupId; return pResBlock; } @@ -1783,8 +1605,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows); doApplyScalarCalculation(pOperator, pBlock, order, scanFlag); - if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) { - pInfo->curGroupId = pInfo->pRes->info.groupId; // the first data block + if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.id.groupId) { + pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block pInfo->totalInputRows += pInfo->pRes->info.rows; if (order == pInfo->pFillInfo->order) { @@ -1793,7 +1615,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.skey); } taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes); - } else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block + } else if (pInfo->curGroupId != pBlock->info.id.groupId) { // the new group data block pInfo->existNewGroupBlock = pBlock; // Fill the previous group data block, before handle the data block of new group. @@ -1810,13 +1632,13 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { // 1. The result in current group not reach the threshold of output result, continue // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) { - pResBlock->info.groupId = pInfo->curGroupId; + pResBlock->info.id.groupId = pInfo->curGroupId; return pResBlock; } doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) { - pResBlock->info.groupId = pInfo->curGroupId; + pResBlock->info.id.groupId = pInfo->curGroupId; return pResBlock; } } else if (pInfo->existNewGroupBlock) { // try next group @@ -1826,7 +1648,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows > pResultInfo->threshold) { - pResBlock->info.groupId = pInfo->curGroupId; + pResBlock->info.id.groupId = pInfo->curGroupId; return pResBlock; } } else { @@ -2967,10 +2789,10 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat continue; } - if (pBlock->info.groupId == 0) { - pBlock->info.groupId = pPos->groupId; + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = pPos->groupId; void* tbname = NULL; - if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { + if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -2978,7 +2800,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat tdbFree(tbname); } else { // current value belongs to different group, it can't be packed into one datablock - if (pBlock->info.groupId != pPos->groupId) { + if (pBlock->info.id.groupId != pPos->groupId) { releaseOutputBuf(pState, &key, pRow); break; } @@ -3058,11 +2880,11 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta continue; } - if (pBlock->info.groupId == 0) { - pBlock->info.groupId = pKey->groupId; + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = pKey->groupId; void* tbname = NULL; - if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { + if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -3070,7 +2892,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta tdbFree(tbname); } else { // current value belongs to different group, it can't be packed into one datablock - if (pBlock->info.groupId != pKey->groupId) { + if (pBlock->info.id.groupId != pKey->groupId) { releaseOutputBuf(pState, NULL, pRow); break; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 6dc8818900..4601175561 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -308,7 +308,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, - len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); + len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } @@ -325,7 +325,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { if (num > 0) { len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, - len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); + len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } @@ -697,7 +697,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { releaseBufPage(pInfo->pBuf, page); blockDataUpdateTsWindow(pInfo->binfo.pRes, 0); - pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId; + pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId; pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows; return pInfo->binfo.pRes; @@ -952,7 +952,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { taosArrayDestroy(pParInfo->rowIds); pParInfo->rowIds = NULL; blockDataUpdateTsWindow(pDest, pInfo->tsColIndex); - pDest->info.groupId = pParInfo->groupId; + pDest->info.id.groupId = pParInfo->groupId; pOperator->resultInfo.totalRows += pDest->info.rows; pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte); ASSERT(pDest->info.rows > 0); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 3839af9913..e7cce39dfd 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -185,7 +185,7 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* int32_t rowIndex = -1; SColumnInfoData* pSrc = NULL; - if (pLeftBlock->info.blockId == blockId) { + if (pLeftBlock->info.id.blockId == blockId) { pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId); rowIndex = leftPos; } else { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 4bba3a72e1..da77facb21 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -138,13 +138,13 @@ _error: static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo) { if (pLimitInfo->remainGroupOffset > 0) { // it is the first group - if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.groupId) { - pLimitInfo->currentGroupId = pBlock->info.groupId; + if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.id.groupId) { + pLimitInfo->currentGroupId = pBlock->info.id.groupId; return PROJECT_RETRIEVE_CONTINUE; - } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) { + } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) { // now it is the data from a new group pLimitInfo->remainGroupOffset -= 1; - pLimitInfo->currentGroupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.id.groupId; // ignore data block in current group if (pLimitInfo->remainGroupOffset > 0) { @@ -153,7 +153,7 @@ static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo } // set current group id of the project operator - pLimitInfo->currentGroupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.id.groupId; } return PROJECT_RETRIEVE_DONE; @@ -164,7 +164,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S // here check for a new group data, we need to handle the data of the previous group. ASSERT(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1); - if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) { + if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) { pLimitInfo->numOfOutputGroups += 1; if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { setOperatorCompleted(pOperator); @@ -306,7 +306,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - status = doIngroupLimitOffset(pLimitInfo, pBlock->info.groupId, pInfo->pRes, pOperator); + status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator); if (status == PROJECT_RETRIEVE_CONTINUE) { continue; } @@ -316,7 +316,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { if (pProjectInfo->mergeDataBlocks) { if (pRes->info.rows > 0) { - pFinalRes->info.groupId = pRes->info.groupId; + pFinalRes->info.id.groupId = pRes->info.id.groupId; pFinalRes->info.version = pRes->info.version; // continue merge data, ignore the group id @@ -511,11 +511,11 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { break; } - if (pIndefInfo->groupId == 0 && pBlock->info.groupId != 0) { - pIndefInfo->groupId = pBlock->info.groupId; // this is the initial group result + if (pIndefInfo->groupId == 0 && pBlock->info.id.groupId != 0) { + pIndefInfo->groupId = pBlock->info.id.groupId; // this is the initial group result } else { - if (pIndefInfo->groupId != pBlock->info.groupId) { // reset output buffer and computing status - pIndefInfo->groupId = pBlock->info.groupId; + if (pIndefInfo->groupId != pBlock->info.id.groupId) { // reset output buffer and computing status + pIndefInfo->groupId = pBlock->info.id.groupId; pIndefInfo->pNextGroupRes = pBlock; break; } @@ -643,3 +643,182 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { return (pRes->info.rows > 0) ? pRes : NULL; } + +static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { + size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0; + for (int32_t i = 0; i < num; ++i) { + pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i); + } +} + +int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, + int32_t numOfOutput, SArray* pPseudoList) { + setPseudoOutputColInfo(pResult, pCtx, pPseudoList); + + if (pSrcBlock == NULL) { + for (int32_t k = 0; k < numOfOutput; ++k) { + int32_t outputSlotId = pExpr[k].base.resSchema.slotId; + + ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE); + SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); + + int32_t type = pExpr[k].base.pParam[0].param.nType; + if (TSDB_DATA_TYPE_NULL == type) { + colDataAppendNNULL(pColInfoData, 0, 1); + } else { + colDataAppend(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false); + } + } + + pResult->info.rows = 1; + return TSDB_CODE_SUCCESS; + } + + if (pResult != pSrcBlock) { + pResult->info.id.groupId = pSrcBlock->info.id.groupId; + memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); + } + + // if the source equals to the destination, it is to create a new column as the result of scalar + // function or some operators. + bool createNewColModel = (pResult == pSrcBlock); + if (createNewColModel) { + blockDataEnsureCapacity(pResult, pResult->info.rows); + } + + int32_t numOfRows = 0; + + for (int32_t k = 0; k < numOfOutput; ++k) { + int32_t outputSlotId = pExpr[k].base.resSchema.slotId; + SqlFunctionCtx* pfCtx = &pCtx[k]; + SInputColumnInfoData* pInputData = &pfCtx->input; + + if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query + SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); + if (pResult->info.rows > 0 && !createNewColModel) { + colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0], + pInputData->numOfRows); + } else { + colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info); + } + + numOfRows = pInputData->numOfRows; + } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { + SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); + + int32_t offset = createNewColModel ? 0 : pResult->info.rows; + + int32_t type = pExpr[k].base.pParam[0].param.nType; + if (TSDB_DATA_TYPE_NULL == type) { + colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows); + } else { + char* p = taosVariantGet(&pExpr[k].base.pParam[0].param, type); + for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { + colDataAppend(pColInfoData, i + offset, p, false); + } + } + + numOfRows = pSrcBlock->info.rows; + } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) { + SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); + taosArrayPush(pBlockList, &pSrcBlock); + + SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; + + SScalarParam dest = {.columnData = &idata}; + int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pBlockList); + return code; + } + + int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; + ASSERT(pResult->info.capacity > 0); + + colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows); + colDataDestroy(&idata); + + numOfRows = dest.numOfRows; + taosArrayDestroy(pBlockList); + } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { + // _rowts/_c0, not tbname column + if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) { + // do nothing + } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx); + pfCtx->fpSet.init(pfCtx, pResInfo); + + pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); + pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset + + // set the timestamp(_rowts) output buffer + if (taosArrayGetSize(pPseudoList) > 0) { + int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); + pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; + } + + // link pDstBlock to set selectivity value + if (pfCtx->subsidiaries.num > 0) { + pfCtx->pDstBlock = pResult; + } + + int32_t code = pfCtx->fpSet.process(pfCtx); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + numOfRows = pResInfo->numOfRes; + } else if (fmIsAggFunc(pfCtx->functionId)) { + // selective value output should be set during corresponding function execution + if (fmIsSelectValueFunc(pfCtx->functionId)) { + continue; + } + // _group_key function for "partition by tbname" + csum(col_name) query + SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); + int32_t slotId = pfCtx->param[0].pCol->slotId; + + // todo handle the json tag + SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId); + for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) { + bool isNull = colDataIsNull_s(pInput, f); + if (isNull) { + colDataAppendNULL(pOutput, pResult->info.rows + f); + } else { + char* data = colDataGetData(pInput, f); + colDataAppend(pOutput, pResult->info.rows + f, data, isNull); + } + } + + } else { + SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); + taosArrayPush(pBlockList, &pSrcBlock); + + SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; + + SScalarParam dest = {.columnData = &idata}; + int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pBlockList); + return code; + } + + int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; + ASSERT(pResult->info.capacity > 0); + colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows); + colDataDestroy(&idata); + + numOfRows = dest.numOfRows; + taosArrayDestroy(pBlockList); + } + } else { + return TSDB_CODE_OPS_NOT_SUPPORT; + } + } + + if (!createNewColModel) { + pResult->info.rows += numOfRows; + } + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c0bea731bd..84b7678b9f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -184,7 +184,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup; SFilePage* pPage = NULL; - SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage); + SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage); if (pRow == NULL) { return TSDB_CODE_SUCCESS; @@ -484,13 +484,13 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int // 1. check if it is existed in meta cache if (pCache == NULL) { metaReaderInit(&mr, pHandle->meta, 0); - code = metaGetTableEntryByUidCache(&mr, pBlock->info.uid); + code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { - qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, + qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno), idStr); } else { - qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno), idStr); } metaReaderClear(&mr); return terrno; @@ -505,16 +505,16 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int } else { pCache->metaFetch += 1; - h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(pBlock->info.uid)); + h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); if (h == NULL) { metaReaderInit(&mr, pHandle->meta, 0); - code = metaGetTableEntryByUidCache(&mr, pBlock->info.uid); + code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid); if (code != TSDB_CODE_SUCCESS) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", - pBlock->info.uid, tstrerror(terrno), idStr); + pBlock->info.id.uid, tstrerror(terrno), idStr); } else { - qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno), idStr); } metaReaderClear(&mr); @@ -528,7 +528,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int val = *pVal; freeReader = true; - int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(uint64_t), pVal, + int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal, sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW); if (ret != TAOS_LRU_STATUS_OK) { qError("failed to put meta into lru cache, code:%d, %s", ret, idStr); @@ -642,13 +642,13 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { SDataBlockInfo* pBInfo = &pBlock->info; int32_t rows = 0; - tsdbRetrieveDataBlockInfo(pTableScanInfo->base.dataReader, &rows, &pBInfo->uid, &pBInfo->window); + tsdbRetrieveDataBlockInfo(pTableScanInfo->base.dataReader, &rows, &pBInfo->id.uid, &pBInfo->window); blockDataEnsureCapacity(pBlock, rows); // todo remove it latter pBInfo->rows = rows; - ASSERT(pBInfo->uid != 0); - pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); + ASSERT(pBInfo->id.uid != 0); + pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); uint32_t status = 0; int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); @@ -668,13 +668,13 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime; // todo refactor - /*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/ + /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/ /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/ pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; - pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid; + pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid; pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; - ASSERT(pBlock->info.uid != 0); + ASSERT(pBlock->info.id.uid != 0); return pBlock; } return NULL; @@ -786,7 +786,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { - ASSERT(result->info.uid != 0); + ASSERT(result->info.id.uid != 0); return result; } @@ -1009,7 +1009,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU SDataBlockInfo* pBInfo = &pBlock->info; int32_t rows = 0; - tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->uid, &pBInfo->window); + tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->id.uid, &pBInfo->window); SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL); blockDataEnsureCapacity(pBlock, rows); @@ -1018,7 +1018,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU relocateColumnData(pBlock, pTableScanInfo->base.matchInfo.pList, pCols, true); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, rows); - pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->uid); + pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->id.uid); } tsdbReaderClose(pReader); @@ -1175,7 +1175,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 pResult->info.calWin = pInfo->updateWin; return pResult; } - } else if (pResult->info.groupId == pInfo->groupId) { + } else if (pResult->info.id.groupId == pInfo->groupId) { pResult->info.calWin = pInfo->updateWin; return pResult; } @@ -1366,7 +1366,7 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { if (pBlock == NULL || pBlock->info.rows == 0) return; void* tbname = NULL; - if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { + if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -1399,8 +1399,8 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { pBlock->info.parTbName[0] = 0; } - if (pBlock->info.groupId && pBlock->info.parTbName[0]) { - streamStatePutParName(pState, pBlock->info.groupId, pBlock->info.parTbName); + if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { + streamStatePutParName(pState, pBlock->info.id.groupId, pBlock->info.parTbName); } blockDataDestroy(pSrcBlock); @@ -1434,7 +1434,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; - bool tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid); + bool tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid); for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; @@ -1445,18 +1445,18 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock isClosed = isCloseWindow(&win, &pInfo->twAggSup); } // must check update info first. - bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); + bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]); bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && - isDeletedStreamWindow(&win, pBlock->info.groupId, + isDeletedStreamWindow(&win, pBlock->info.id.groupId, pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup); if ((update || closedWin) && out) { qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); uint64_t gpId = 0; - appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId, + appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId, NULL); if (closedWin && pInfo->partitionSup.needCalc) { gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId); - appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId, + appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId, NULL); } } @@ -1476,11 +1476,11 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); pInfo->pRes->info.rows = pBlock->info.rows; - pInfo->pRes->info.uid = pBlock->info.uid; + pInfo->pRes->info.id.uid = pBlock->info.id.uid; pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.version = pBlock->info.version; - pInfo->pRes->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); + pInfo->pRes->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { @@ -1807,8 +1807,8 @@ FETCH_NEXT_BLOCK: int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); - if (pBlock->info.groupId && pBlock->info.parTbName[0]) { - streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, pBlock->info.parTbName); + if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { + streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName); } // TODO move into scan pBlock->info.calWin.skey = INT64_MIN; @@ -1960,7 +1960,7 @@ FETCH_NEXT_BLOCK: setBlockIntoRes(pInfo, &block, false); - if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, + if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId, pInfo->pRes->info.version)) { printDataBlock(pInfo->pRes, "stream scan ignore"); blockDataCleanup(pInfo->pRes); @@ -2050,7 +2050,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { } int32_t rows = 0; - tsdbRetrieveDataBlockInfo(pInfo->dataReader, &rows, &pBlock->info.uid, &pBlock->info.window); + tsdbRetrieveDataBlockInfo(pInfo->dataReader, &rows, &pBlock->info.id.uid, &pBlock->info.window); pBlock->info.rows = rows; SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); @@ -2059,9 +2059,9 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, terrno); } - qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.uid); + qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid); pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; - pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid; + pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid; pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; return pBlock; } @@ -2555,7 +2555,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { blockDataCleanup(pBlock); int32_t rows = 0; - tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window); + tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.id.uid, &pBlock->info.window); blockDataEnsureCapacity(pBlock, rows); pBlock->info.rows = rows; @@ -2577,7 +2577,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { continue; } - pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); + pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -2773,7 +2773,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); if (pBlock != NULL) { - pBlock->info.groupId = pInfo->groupId; + pBlock->info.id.groupId = pInfo->groupId; pOperator->resultInfo.totalRows += pBlock->info.rows; return pBlock; } else { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index f2c8dc5083..ec754f31b0 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -359,7 +359,7 @@ SSDataBlock* fetchNextGroupSortDataBlock(void* param) { SOperatorInfo* childOp = source->childOpInfo; SSDataBlock* block = childOp->fpSet.getNextFn(childOp); if (block != NULL) { - if (block->info.groupId == grpSortOpInfo->currGroupId) { + if (block->info.id.groupId == grpSortOpInfo->currGroupId) { grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; return block; } else { @@ -439,7 +439,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); return NULL; } - pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; + pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId; pInfo->childOpStatus = CHILD_OP_NEW_GROUP; beginSortGroup(pOperator); } @@ -451,13 +451,13 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pInfo); if (pBlock != NULL) { - pBlock->info.groupId = pInfo->currGroupId; + pBlock->info.id.groupId = pInfo->currGroupId; pOperator->resultInfo.totalRows += pBlock->info.rows; return pBlock; } else { if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) { finishSortGroup(pOperator); - pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; + pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId; beginSortGroup(pOperator); } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) { finishSortGroup(pOperator); @@ -691,10 +691,10 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData pInfo->limitInfo.numOfOutputRows += p->info.rows; pDataBlock->info.rows = p->info.rows; - pDataBlock->info.groupId = pInfo->groupId; + pDataBlock->info.id.groupId = pInfo->groupId; } - qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.groupId, + qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId, pDataBlock->info.rows); return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 7ef2668804..3d35326749 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1831,39 +1831,39 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con } static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } - SBlockDistInfo* pBlockScanInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SBlockDistInfo* pBlockScanInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; - int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, - (int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo)); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } + STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; + int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, + (int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } - tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); - blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle); + tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); + blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle); - SSDataBlock* pBlock = pBlockScanInfo->pResBlock; + SSDataBlock* pBlock = pBlockScanInfo->pResBlock; - int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId; - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId); + int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId; + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId); - int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); - char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); - tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); - varDataSetLen(p, len); + int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); + char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); + tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); + varDataSetLen(p, len); - colDataAppend(pColInfo, 0, p, false); - taosMemoryFree(p); + colDataAppend(pColInfo, 0, p, false); + taosMemoryFree(p); - pBlock->info.rows = 1; - pOperator->status = OP_EXEC_DONE; - return pBlock; + pBlock->info.rows = 1; + pOperator->status = OP_EXEC_DONE; + return pBlock; } static void destroyBlockDistScanOperatorInfo(void* param) { diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index ba826a23d2..ec02633563 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1092,7 +1092,7 @@ static bool checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t group } static void buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock) { - uint64_t groupId = pBlock->info.groupId; + uint64_t groupId = pBlock->info.id.groupId; if (pFillSup->hasDelete && !checkResult(pFillSup, ts, groupId)) { return; } @@ -1131,7 +1131,7 @@ static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo* static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) { while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { - uint64_t groupId = pBlock->info.groupId; + uint64_t groupId = pBlock->info.id.groupId; SWinKey key = {.groupId = groupId, .ts = pFillInfo->current}; if (pFillSup->hasDelete && !checkResult(pFillSup, pFillInfo->current, groupId)) { pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, @@ -1230,7 +1230,7 @@ void keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol, int32_t rowId, SSDataBlock* pRes) { - uint64_t groupId = pBlock->info.groupId; + uint64_t groupId = pBlock->info.id.groupId; getWindowFromDiscBuf(pOperator, tsCol[rowId], groupId, pFillSup); if (pFillSup->prev.key == pFillInfo->preRowKey) { resetFillWindow(&pFillSup->prev); @@ -1245,9 +1245,9 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { SStreamFillSupporter* pFillSup = pInfo->pFillSup; SStreamFillInfo* pFillInfo = pInfo->pFillInfo; SSDataBlock* pBlock = pInfo->pSrcBlock; - uint64_t groupId = pBlock->info.groupId; + uint64_t groupId = pBlock->info.id.groupId; SSDataBlock* pRes = pInfo->pRes; - pRes->info.groupId = groupId; + pRes->info.id.groupId = groupId; if (hasRemainCalc(pFillInfo)) { doStreamFillRange(pFillInfo, pFillSup, pRes); } @@ -1342,14 +1342,14 @@ static void doDeleteFillFinalize(SOperatorInfo* pOperator) { tSimpleHashClear(pInfo->pFillSup->pResMap); for (; pFillInfo->delIndex < size; pFillInfo->delIndex++) { STimeRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex); - if (pInfo->pRes->info.groupId != 0 && pInfo->pRes->info.groupId != range->groupId) { + if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) { return; } getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup); setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo); if (pInfo->pFillInfo->needFill) { doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); - pInfo->pRes->info.groupId = range->groupId; + pInfo->pRes->info.id.groupId = range->groupId; } SWinKey key = {.ts = range->skey, .groupId = range->groupId}; streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key); @@ -1435,7 +1435,7 @@ static void doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock pSup = &pInfo->pFillSup->notFillExprSup; setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false); projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL); - pDstBlock->info.groupId = pSrcBlock->info.groupId; + pDstBlock->info.id.groupId = pSrcBlock->info.id.groupId; blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0e0ec5b339..a6a477a9e3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -655,7 +655,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0); int64_t prevTs = *(int64_t*)pTsKey->pData; - if (groupId == pBlock->info.groupId) { + if (groupId == pBlock->info.id.groupId) { doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP, pSup); } @@ -927,7 +927,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t startPos = 0; int32_t numOfOutput = pSup->numOfExprs; int64_t* tsCols = extractTsCol(pBlock, pInfo); - uint64_t tableGroupId = pBlock->info.groupId; + uint64_t tableGroupId = pBlock->info.id.groupId; bool ascScan = (pInfo->inputOrder == TSDB_ORDER_ASC); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; @@ -1112,7 +1112,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI SExprSupp* pSup = &pOperator->exprSupp; SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId); - int64_t gid = pBlock->info.groupId; + int64_t gid = pBlock->info.id.groupId; bool masterScan = true; int32_t numOfOutput = pOperator->exprSupp.numOfExprs; @@ -1829,7 +1829,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator bool masterScan = true; int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - int64_t gid = pBlock->info.groupId; + int64_t gid = pBlock->info.id.groupId; int64_t gap = pInfo->gap; @@ -2333,7 +2333,7 @@ void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* } // clear the existed group id - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; buildDataBlockFromGroupRes(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); } @@ -2573,7 +2573,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); } setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap); if (IS_FINAL_OP(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2591,7 +2591,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.groupId, NULL); + doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.id.groupId, NULL); } maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.watermark); @@ -3086,7 +3086,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - uint64_t groupId = pSDataBlock->info.groupId; + uint64_t groupId = pSDataBlock->info.id.groupId; int64_t code = TSDB_CODE_SUCCESS; SResultRow* pResult = NULL; int32_t rows = pSDataBlock->info.rows; @@ -3377,7 +3377,7 @@ void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroup } // clear the existed group id - pBlock->info.groupId = 0; + pBlock->info.id.groupId = 0; buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); } @@ -3854,7 +3854,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamStateAggOperatorInfo* pInfo = pOperator->info; int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - int64_t groupId = pSDataBlock->info.groupId; + int64_t groupId = pSDataBlock->info.id.groupId; int64_t code = TSDB_CODE_SUCCESS; TSKEY* tsCols = NULL; SResultRow* pResult = NULL; @@ -4178,7 +4178,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) { - pRes->info.groupId = pMiaInfo->groupId; + pRes->info.id.groupId = pMiaInfo->groupId; pMiaInfo->curTs = INT64_MIN; pMiaInfo->groupId = 0; } @@ -4203,7 +4203,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { pBlock = pMiaInfo->prefetchedBlock; pMiaInfo->prefetchedBlock = NULL; - pMiaInfo->groupId = pBlock->info.groupId; + pMiaInfo->groupId = pBlock->info.id.groupId; } // no data exists, all query processing is done @@ -4220,12 +4220,12 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { } if (pMiaInfo->groupId == 0) { - if (pMiaInfo->groupId != pBlock->info.groupId) { - pMiaInfo->groupId = pBlock->info.groupId; - pRes->info.groupId = pMiaInfo->groupId; + if (pMiaInfo->groupId != pBlock->info.id.groupId) { + pMiaInfo->groupId = pBlock->info.id.groupId; + pRes->info.id.groupId = pMiaInfo->groupId; } } else { - if (pMiaInfo->groupId != pBlock->info.groupId) { + if (pMiaInfo->groupId != pBlock->info.id.groupId) { // if there are unclosed time window, close it firstly. ASSERT(pMiaInfo->curTs != INT64_MIN); finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo); @@ -4236,7 +4236,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { break; } else { // continue - pRes->info.groupId = pMiaInfo->groupId; + pRes->info.id.groupId = pMiaInfo->groupId; } } @@ -4443,7 +4443,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* int32_t startPos = 0; int32_t numOfOutput = pExprSup->numOfExprs; int64_t* tsCols = extractTsCol(pBlock, iaInfo); - uint64_t tableGroupId = pBlock->info.groupId; + uint64_t tableGroupId = pBlock->info.id.groupId; bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; @@ -4549,7 +4549,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { pBlock = downstream->fpSet.getNextFn(downstream); } else { pBlock = miaInfo->prefetchedBlock; - miaInfo->groupId = pBlock->info.groupId; + miaInfo->groupId = pBlock->info.id.groupId; miaInfo->prefetchedBlock = NULL; } @@ -4561,8 +4561,8 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { if (!miaInfo->hasGroupId) { miaInfo->hasGroupId = true; - miaInfo->groupId = pBlock->info.groupId; - } else if (miaInfo->groupId != pBlock->info.groupId) { + miaInfo->groupId = pBlock->info.id.groupId; + } else if (miaInfo->groupId != pBlock->info.id.groupId) { miaInfo->prefetchedBlock = pBlock; break; } @@ -4576,7 +4576,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { } } - pRes->info.groupId = miaInfo->groupId; + pRes->info.id.groupId = miaInfo->groupId; } if (miaInfo->inputBlocksFinished) { @@ -4585,7 +4585,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { if (listNode != NULL) { SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data); // finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes); - pRes->info.groupId = grpWin->groupId; + pRes->info.id.groupId = grpWin->groupId; } } @@ -4744,7 +4744,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); minTs = TMIN(minTs, pBlock->info.window.skey); - doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap); } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index e0a0b9442e..3f91142708 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -417,8 +417,8 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { SSDataBlock* pRightBlock = pRightSource->src.pBlock; if (pParam->cmpGroupId) { - if (pLeftBlock->info.groupId != pRightBlock->info.groupId) { - return pLeftBlock->info.groupId < pRightBlock->info.groupId ? -1 : 1; + if (pLeftBlock->info.id.groupId != pRightBlock->info.id.groupId) { + return pLeftBlock->info.id.groupId < pRightBlock->info.id.groupId ? -1 : 1; } } @@ -826,7 +826,7 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) { } } -uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.groupId; } +uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.groupId; } SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) { SSortExecInfo info = {0}; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 8715aa0be1..c78ec5b999 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1096,7 +1096,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { SSDataBlock *pTempBlock = createDataBlock(); pTempBlock->info.rows = pInput->totalRows; - pTempBlock->info.uid = pInput->uid; + pTempBlock->info.id.uid = pInput->uid; for (int32_t i = 0; i < numOfCols; ++i) { blockDataAppendColInfo(pTempBlock, pInput->pData[i]); } diff --git a/source/libs/index/test/index_executor_tests.cpp b/source/libs/index/test/index_executor_tests.cpp index bcc474dc8b..c8a7ca98f0 100644 --- a/source/libs/index/test/index_executor_tests.cpp +++ b/source/libs/index/test/index_executor_tests.cpp @@ -78,7 +78,7 @@ void sifAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *sl blockDataEnsureCapacity(res, rows); *dataBlockId = taosArrayGetSize(pBlockList) - 1; - res->info.blockId = *dataBlockId; + res->info.id.blockId = *dataBlockId; *slotId = 0; } else { SSDataBlock *res = *(SSDataBlock **)taosArrayGetLast(pBlockList); diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index d1271e9290..55f33b7a3e 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -378,7 +378,7 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t int32_t index = -1; for (int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) { SSDataBlock *pb = taosArrayGetP(ctx->pBlockList, i); - if (pb->info.blockId == ref->dataBlockId) { + if (pb->info.id.blockId == ref->dataBlockId) { index = i; break; } @@ -1384,7 +1384,7 @@ EDealRes sclWalkTarget(SNode *pNode, SScalarCtx *ctx) { int32_t index = -1; for (int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) { SSDataBlock *pb = taosArrayGetP(ctx->pBlockList, i); - if (pb->info.blockId == target->dataBlockId) { + if (pb->info.id.blockId == target->dataBlockId) { index = i; break; } diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index 0fd0c98c1a..dae26d3d58 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -98,7 +98,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s taosArrayPush(pBlockList, &res); *dataBlockId = taosArrayGetSize(pBlockList) - 1; - res->info.blockId = *dataBlockId; + res->info.id.blockId = *dataBlockId; *slotId = 0; } else { SSDataBlock *res = *(SSDataBlock **)taosArrayGetLast(pBlockList); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 573a1ea31f..2c36c299ee 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -463,7 +463,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat continue; } - if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.groupId) < 0) { + if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.id.groupId) < 0) { goto FAIL_SHUFFLE_DISPATCH; } } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 15526cd8bb..1589fddda4 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -166,7 +166,7 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) { TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) { if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN; TSKEY maxTs = INT64_MIN; - int64_t tbUid = pBlock->info.uid; + int64_t tbUid = pBlock->info.id.uid; SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol);