diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index fa203e231a..41e91ce6a6 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -238,10 +238,16 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32 static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { - int32_t* colSizes = (int32_t*)data; + int32_t* actualLen = (int32_t*) data; + data += sizeof(int32_t); + uint64_t* groupId = (uint64_t*) data; + data += sizeof(uint64_t); + + int32_t* colSizes = (int32_t*)data; data += numOfCols * sizeof(int32_t); - *dataLen = (numOfCols * sizeof(int32_t)); + + *dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t)); int32_t numOfRows = pBlock->info.rows; for (int32_t col = 0; col < numOfCols; ++col) { @@ -273,6 +279,9 @@ static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* da colSizes[col] = htonl(colSizes[col]); } + + *actualLen = *dataLen; + *groupId = pBlock->info.groupId; } #ifdef __cplusplus diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 8fcd16a622..73b0a0e4dc 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -835,10 +835,21 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 return code; } - int32_t* colLength = (int32_t*)pResultInfo->pData; - char* pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols; + char* p = (char*) pResultInfo->pData; + + int32_t dataLen = *(int32_t*) p; + p += sizeof(int32_t); + + uint64_t groupId = *(uint64_t*) p; + p += sizeof(uint64_t); + + int32_t* colLength = (int32_t*)p; + p += sizeof(int32_t) * numOfCols; + + char* pStart = p; for (int32_t i = 0; i < numOfCols; ++i) { colLength[i] = htonl(colLength[i]); + ASSERT(colLength[i] < dataLen); if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { pResultInfo->pCol[i].offset = (int32_t*)pStart; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 800bcd70ce..09452b584d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -644,12 +644,12 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) { /** * @refitem blockDataToBuf for the meta size - * * @param pBlock * @return */ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { - return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t); + // | total rows/total length | block group id | each column length | + return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t); } double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { @@ -1219,7 +1219,6 @@ void colDataDestroy(SColumnInfoData* pColData) { taosMemoryFree(pColData->pData); } - static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) { int32_t len = BitmapLen(total); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index fd640471bd..c73566355d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3223,8 +3223,13 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows, cur->win.skey, cur->win.ekey, pHandle->idStr); - // pDataBlockInfo->uid = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign - // the table uid + pDataBlockInfo->uid = uid; + +#if 0 + // for multi-group data query processing test purpose + pDataBlockInfo->groupId = uid; +#endif + pDataBlockInfo->rows = cur->rows; pDataBlockInfo->window = cur->win; pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 4853bb4eb3..7af4546ff3 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -304,8 +304,8 @@ int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t memcpy(row.buf, tbuf, len); row.level = level; - row.len = len; - ctx->dataSize += len; + row.len = len; + ctx->dataSize += row.len; if (NULL == taosArrayPush(ctx->rows, &row)) { qError("taosArrayPush row to explain res rows failed"); @@ -783,7 +783,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { } int32_t colNum = 1; - int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize; + int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); if (NULL == rsp) { qError("malloc SRetrieveTableRsp failed, size:%d", rspSize); @@ -793,29 +793,38 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { rsp->completed = 1; rsp->numOfRows = htonl(rowNum); - *(int32_t *)rsp->data = htonl(pCtx->dataSize); + // payload length + *(int32_t *)rsp->data = sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize; - int32_t *offset = (int32_t *)((char *)rsp->data + sizeof(int32_t)); + // group id + *(uint64_t*)(rsp->data + sizeof(int32_t)) = 0; + + // column length + int32_t* colLength = (int32_t *)(rsp->data + sizeof(int32_t) + sizeof(uint64_t)); + + // varchar column offset segment + int32_t *offset = (int32_t *)((char *)colLength + sizeof(int32_t)); + + // varchar data real payload char *data = (char *)(offset + rowNum); - int32_t tOffset = 0; - + + char* start = data; for (int32_t i = 0; i < rowNum; ++i) { SQueryExplainRowInfo *row = taosArrayGet(pCtx->rows, i); - *offset = tOffset; - tOffset += row->len; + offset[i] = data - start; - memcpy(data, row->buf, row->len); - - ++offset; + varDataCopy(data, row->buf); + ASSERT(varDataTLen(row->buf) == row->len); data += row->len; } - *pRsp = rsp; + *colLength = htonl(data - start); + rsp->compLen = htonl(rspSize); + *pRsp = rsp; return TSDB_CODE_SUCCESS; } - int32_t qExplainPrepareCtx(SQueryPlan *pDag, SExplainCtx **pCtx) { int32_t code = 0; SNodeListNode *plans = NULL; @@ -922,9 +931,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) { int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) { QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0)); - QRY_ERR_RET(qExplainAppendPlanRows(pCtx)); - QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp)); return TSDB_CODE_SUCCESS; @@ -994,13 +1001,10 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) { SExplainCtx *pCtx = NULL; QRY_ERR_RET(qExplainPrepareCtx(pDag, &pCtx)); - QRY_ERR_JRET(qExplainGenerateRsp(pCtx, pRsp)); _return: - qExplainFreeCtx(pCtx); - QRY_RET(code); } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 834d37927a..f0cb1c1107 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -40,8 +40,6 @@ #define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) -#define curTimeWindowIndex(_winres) ((_winres)->curIndex) - typedef struct SGroupResInfo { int32_t totalGroup; int32_t currentGroup; @@ -68,11 +66,16 @@ typedef struct SResultRowPosition { int32_t offset; } SResultRowPosition; +typedef struct SResKeyPos { + SResultRowPosition pos; + uint64_t groupId; + char key[]; +} SResKeyPos; + typedef struct SResultRowInfo { SResultRowPosition *pPosition; int32_t size; // number of result set int32_t capacity; // max capacity -// int32_t curPos; // current active result row index of pResult list SResultRowPosition cur; } SResultRowInfo; @@ -135,7 +138,7 @@ typedef struct { int32_t colId; } SStddevInterResult; -void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo); +void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, bool sortGroupResult); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index e897bc8892..a5ce01dba2 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -64,10 +64,10 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { } // data format: -// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+ -// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... -// | | sizeof(int32_t) * numOfCols | actual size | | actual size | | -// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+ +// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+ +// |SDataCacheEntry | total length | group id | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... +// | | (4 bytes) |(8 bytes) | sizeof(int32_t) * numOfCols | actual size | | actual size | | +// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+ // The length of bitmap is decided by number of rows of this data block, and the length of each column data is // recorded in the first segment, next to the struct header static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 98b72cf4c2..c3fa777779 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -186,12 +186,50 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { pGroupResInfo->index = 0; } -void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) { +static int32_t resultrowCompar1(const void* p1, const void* p2) { + SResKeyPos* pp1 = *(SResKeyPos**) p1; + SResKeyPos* pp2 = *(SResKeyPos**) p2; + + if (pp1->groupId == pp2->groupId) { + int64_t pts1 = *(int64_t*) pp1->key; + int64_t pts2 = *(int64_t*) pp2->key; + + if (pts1 == pts2) { + return 0; + } else { + return pts1 < pts2? -1:1; + } + } else { + return pp1->groupId < pp2->groupId? -1:1; + } +} + +void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, bool sortGroupResult) { if (pGroupResInfo->pRows != NULL) { taosArrayDestroy(pGroupResInfo->pRows); } - pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pPosition, pResultInfo->size, sizeof(SResultRowPosition)); + // extract the result rows information from the hash map + void* pData = NULL; + pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES); + + size_t keyLen = 0; + while((pData = taosHashIterate(pHashmap, pData)) != NULL) { + void* key = taosHashGetKey(pData, &keyLen); + + SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition)); + + p->groupId = *(uint64_t*) key; + p->pos = *(SResultRowPosition*) pData; + memcpy(p->key, key + sizeof(uint64_t), keyLen - sizeof(uint64_t)); + + taosArrayPush(pGroupResInfo->pRows, &p); + } + + if (sortGroupResult) { + qsort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), POINTER_BYTES, resultrowCompar1); + } + pGroupResInfo->index = 0; assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b28dadb594..be3e95224b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -431,6 +431,13 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, return pResultRow; } +/** + * the struct of key in hash table + * +----------+---------------+ + * | group id | key data | + * | 8 bytes | actual length | + * +----------+---------------+ + */ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) { @@ -1447,7 +1454,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe SArray* pUpdated = NULL; if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - pUpdated = taosArrayInit(4, sizeof(SResultRowPosition)); + pUpdated = taosArrayInit(4, POINTER_BYTES); } int32_t step = 1; @@ -1459,8 +1466,6 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe if (pSDataBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; -// assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows - 1] == -// pSDataBlock->info.window.ekey); } int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1); @@ -1478,7 +1483,11 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; + SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + pos->groupId = tableGroupId; + pos->pos = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset}; + *(int64_t*) pos->key = pResult->win.skey; + taosArrayPush(pUpdated, &pos); } @@ -1550,7 +1559,11 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; + SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + pos->groupId = tableGroupId; + pos->pos = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset}; + *(int64_t*) pos->key = pResult->win.skey; + taosArrayPush(pUpdated, &pos); } @@ -2812,6 +2825,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { } } +// todo merged with the build group result. void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) { @@ -2846,40 +2860,36 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD } } +// todo merged with the build group result. void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList, int32_t* rowCellInfoOffset) { size_t num = taosArrayGetSize(pUpdateList); for (int32_t i = 0; i < num; ++i) { - SResultRowPosition* pPos = taosArrayGet(pUpdateList, i); - - SFilePage* bufPage = getBufPage(pBuf, pPos->pageId); - SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset); + SResKeyPos * pPos = taosArrayGetP(pUpdateList, i); + SFilePage* bufPage = getBufPage(pBuf, pPos->pos.pageId); + SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset); +// for (int32_t j = 0; j < numOfOutput; ++j) { pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); - +// struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; - if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { - continue; - } - - if (pCtx[j].fpSet.process) { // TODO set the dummy function. -// pCtx[j].fpSet.finalize(&pCtx[j]); - pResInfo->initialized = true; - } - +// if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { +// continue; +// } +// +// if (pCtx[j].fpSet.process) { // TODO set the dummy function. +//// pCtx[j].fpSet.finalize(&pCtx[j]); +// pResInfo->initialized = true; +// } +// if (pRow->numOfRows < pResInfo->numOfRes) { pRow->numOfRows = pResInfo->numOfRes; } } releaseBufPage(pBuf, bufPage); - /* - * set the number of output results for group by normal columns, the number of output rows usually is 1 except - * the top and bottom query - */ - // buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput); } } @@ -3092,7 +3102,6 @@ void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWin } /** - * copyToOutputBuf support copy data in ascending/descending order * For interval query of both super table and table, copy the data in ascending order, since the output results are * ordered in SWindowResutl already. While handling the group by query for both table and super table, * all group result are completed already. @@ -3101,7 +3110,7 @@ void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWin * @param result */ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, - int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) { + int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfResult = pBlock->info.rows; // there are already exists result rows @@ -3120,10 +3129,10 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased } for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { - SResultRowPosition* pPos = taosArrayGet(pGroupResInfo->pRows, i); - SFilePage* page = getBufPage(pBuf, pPos->pageId); + SResKeyPos *pPos = taosArrayGetP(pGroupResInfo->pRows, i); + SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); - SResultRow* pRow = (SResultRow*)((char*)page + pPos->offset); + SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset); if (pRow->numOfRows == 0) { pGroupResInfo->index += 1; continue; @@ -3767,9 +3776,15 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI blockDataEnsureCapacity(pRes, numOfRows); if (pColList == NULL) { // data from other sources - int32_t* colLen = (int32_t*)pData; - char* pStart = pData + sizeof(int32_t) * numOfOutput; + int32_t dataLen = *(int32_t*) pData; + pData += sizeof(int32_t); + pRes->info.groupId = *(uint64_t*) pData; + pData += sizeof(uint64_t); + + int32_t* colLen = (int32_t*)pData; + + char* pStart = pData + sizeof(int32_t) * numOfOutput; for (int32_t i = 0; i < numOfOutput; ++i) { colLen[i] = htonl(colLen[i]); ASSERT(colLen[i] > 0); @@ -3822,7 +3837,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI blockDataEnsureCapacity(&block, numOfRows); - int32_t* colLen = (int32_t*) pStart; + int32_t dataLen = *(int32_t*) pStart; + uint64_t groupId = *(uint64_t*) (pStart + sizeof(int32_t)); + pStart += sizeof(int32_t) + sizeof(uint64_t); + + int32_t* colLen = (int32_t*) (pStart); pStart += sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { @@ -4277,18 +4296,6 @@ static void assignExprInfo(SExprInfo* dst, const SExprInfo* src) { // } } -static SExprInfo* exprArrayDup(SArray* pExprList) { - size_t numOfOutput = taosArrayGetSize(pExprList); - - SExprInfo* p = taosMemoryCalloc(numOfOutput, sizeof(SExprInfo)); - for (int32_t i = 0; i < numOfOutput; ++i) { - SExprInfo* pExpr = taosArrayGetP(pExprList, i); - assignExprInfo(&p[i], pExpr); - } - - return p; -} - // TODO merge aggregate super table static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { @@ -4779,7 +4786,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfOutput, pAggInfo->aggSup.pResultBuf, &pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset); - initGroupResInfo(&pAggInfo->groupResInfo, &pAggInfo->binfo.resultRowInfo); + initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false); OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; } @@ -5077,7 +5084,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); - hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); + hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId); #if 0 // test for encode/decode result info if(pOperator->encodeResultRow){ @@ -5099,7 +5106,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); - initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); + initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; } @@ -5224,7 +5231,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); - initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); +// initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { @@ -5275,7 +5282,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); - initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); +// initGroupedResultInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); OPTR_SET_OPENED(pOperator); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -5400,7 +5407,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { closeAllResultRows(&pBInfo->resultRowInfo); finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); - initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); + initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { @@ -5449,7 +5456,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) closeAllResultRows(&pBInfo->resultRowInfo); finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); - initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); + initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { @@ -5881,7 +5888,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->order = TSDB_ORDER_ASC; pInfo->interval = *pInterval; - pInfo->execModel = pTaskInfo->execModel; + pInfo->execModel = OPTR_EXEC_MODEL_STREAM; +// pInfo->execModel = pTaskInfo->execModel; pInfo->win = pTaskInfo->window; pInfo->twAggSup = *pTwAggSupp; pInfo->primaryTsIndex = primaryTsSlotId; @@ -6492,7 +6500,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo }; return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pTableScanNode->dataRequired, - pScanPhyNode->count, pScanPhyNode->reverse, pColList, pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo); + pScanPhyNode->count, pScanPhyNode->reverse, pColList, pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, + pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); @@ -6895,8 +6904,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } - STableGroupInfo group = {0}; - (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group); + (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo); if (NULL == (*pTaskInfo)->pRoot) { code = terrno; goto _complete; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8a0da52a8e..3ea51b61e8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -308,7 +308,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou // } blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); - initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); + initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false); while(1) { doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ce3de02c9f..4d5ea6cc7a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -271,9 +271,7 @@ static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCt static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { STableScanInfo* pTableScanInfo = pOperator->info; - SSDataBlock* pBlock = pTableScanInfo->pResBlock; - STableGroupInfo* pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo; - + SSDataBlock* pBlock = pTableScanInfo->pResBlock; *newgroup = false; while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { @@ -284,18 +282,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { pTableScanInfo->numOfBlocks += 1; tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info); - // todo opt - // if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) { - // STableQueryInfo** pTableQueryInfo = - // (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.uid, sizeof(pBlock->info.uid)); - // if (pTableQueryInfo == NULL) { - // break; - // } - // - // doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order); - // } - - // this function never returns error? uint32_t status = 0; int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); @@ -308,6 +294,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { continue; } + // reset the block to be 0 by default, this blockId is assigned by physical plan and is used by direct upstream operator. + pBlock->info.blockId = 0; return pBlock; } @@ -405,11 +393,11 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = numOfOutput; - pOperator->getNextFn = doTableScan; - pOperator->pTaskInfo = pTaskInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = numOfOutput; + pOperator->getNextFn = doTableScan; + pOperator->pTaskInfo = pTaskInfo; static int32_t cost = 0; pOperator->cost.openCost = ++cost; @@ -823,12 +811,12 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { int32_t tableNameSlotId = 1; SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId); - char* name = NULL; + char* tb = NULL; int32_t numOfRows = 0; char n[TSDB_TABLE_NAME_LEN] = {0}; - while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) { - STR_TO_VARSTR(n, name); + while ((tb = metaTbCursorNext(pInfo->pCur)) != NULL) { + STR_TO_VARSTR(n, tb); colDataAppend(pTableNameCol, numOfRows, n, false); numOfRows += 1; if (numOfRows >= pInfo->capacity) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8506cfc33d..85cba79510 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3600,7 +3600,9 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { return TSDB_CODE_OUT_OF_MEMORY; } - pQuery->precision = extractResultTsPrecision((SSelectStmt*)pQuery->pRoot); + if (nodeType(pQuery->pRoot) == QUERY_NODE_SELECT_STMT) { + pQuery->precision = extractResultTsPrecision((SSelectStmt*)pQuery->pRoot); + } } if (NULL != pCxt->pDbs) { diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 1c655fc2bf..2d741b18f6 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -476,6 +476,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t return (void*)buf; } +// todo remove it // order array void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) { taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);