224 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C
		
	
	
	
			
		
		
	
	
			224 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C
		
	
	
	
| /*
 | |
|  * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 | |
|  *
 | |
|  * This program is free software: you can use, redistribute, and/or modify
 | |
|  * it under the terms of the GNU Affero General Public License, version 3
 | |
|  * or later ("AGPL"), as published by the Free Software Foundation.
 | |
|  *
 | |
|  * This program is distributed in the hope that it will be useful, but WITHOUT
 | |
|  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 | |
|  * FITNESS FOR A PARTICULAR PURPOSE.
 | |
|  *
 | |
|  * You should have received a copy of the GNU Affero General Public License
 | |
|  * along with this program. If not, see <http://www.gnu.org/licenses/>.
 | |
|  */
 | |
| #ifndef TDENGINE_EXECUTIL_H
 | |
| #define TDENGINE_EXECUTIL_H
 | |
| 
 | |
| #include "executor.h"
 | |
| #include "function.h"
 | |
| #include "nodes.h"
 | |
| #include "plannodes.h"
 | |
| #include "storageapi.h"
 | |
| #include "tcommon.h"
 | |
| #include "tpagedbuf.h"
 | |
| #include "tsimplehash.h"
 | |
| 
 | |
| #define T_LONG_JMP(_obj, _c)                                                              \
 | |
|   do {                                                                                    \
 | |
|     qError("error happens at %s, line:%d, code:%s", __func__, __LINE__, tstrerror((_c))); \
 | |
|     longjmp((_obj), (_c));                                                                \
 | |
|   } while (0)
 | |
| 
 | |
| #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid)           \
 | |
|   do {                                                     \
 | |
|     assert(sizeof(_uid) == sizeof(uint64_t));              \
 | |
|     *(uint64_t*)(_k) = (_uid);                             \
 | |
|     (void)memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \
 | |
|   } while (0)
 | |
| 
 | |
| #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
 | |
| 
 | |
| typedef struct SGroupResInfo {
 | |
|   int32_t index;    // rows consumed in func:doCopyToSDataBlockXX
 | |
|   int32_t iter;     // relate to index-1, last consumed data's slot id in hash table
 | |
|   void*   dataPos;  // relate to index-1, last consumed data's position, in the nodelist of cur slot
 | |
|   SArray* pRows;    // SArray<SResKeyPos>
 | |
|   char*   pBuf;
 | |
|   bool    freeItem;
 | |
| } SGroupResInfo;
 | |
| 
 | |
| typedef struct SResultRow {
 | |
|   int32_t                    pageId;  // pageId & rowId is the position of current result in disk-based output buffer
 | |
|   int32_t                    offset : 29;  // row index in buffer page
 | |
|   bool                       startInterp;  // the time window start timestamp has done the interpolation already.
 | |
|   bool                       endInterp;    // the time window end timestamp has done the interpolation already.
 | |
|   bool                       closed;       // this result status: closed or opened
 | |
|   uint32_t                   numOfRows;    // number of rows of current time window
 | |
|   STimeWindow                win;
 | |
|   struct SResultRowEntryInfo pEntryInfo[];  // For each result column, there is a resultInfo
 | |
| } SResultRow;
 | |
| 
 | |
| typedef struct SResultRowPosition {
 | |
|   int32_t pageId;
 | |
|   int32_t offset;
 | |
| } SResultRowPosition;
 | |
| 
 | |
| typedef struct SResKeyPos {
 | |
|   SResultRowPosition pos;
 | |
|   uint64_t           groupId;
 | |
|   char               key[];
 | |
| } SResKeyPos;
 | |
| 
 | |
| typedef struct SResultRowInfo {
 | |
|   int32_t            size;  // number of result set
 | |
|   SResultRowPosition cur;
 | |
|   SList*             openWindow;
 | |
| } SResultRowInfo;
 | |
| 
 | |
| typedef struct SColMatchItem {
 | |
|   int32_t   colId;
 | |
|   int32_t   srcSlotId;
 | |
|   int32_t   dstSlotId;
 | |
|   bool      needOutput;
 | |
|   SDataType dataType;
 | |
|   int32_t   funcType;
 | |
|   bool      isPk;
 | |
| } SColMatchItem;
 | |
| 
 | |
| typedef struct SColMatchInfo {
 | |
|   SArray* pList;      // SArray<SColMatchItem>
 | |
|   int32_t matchType;  // determinate the source according to col id or slot id
 | |
| } SColMatchInfo;
 | |
| 
 | |
| typedef struct SExecTaskInfo SExecTaskInfo;
 | |
| 
 | |
| typedef struct STableListIdInfo {
 | |
|   uint64_t suid;
 | |
|   uint64_t uid;
 | |
|   int32_t  tableType;
 | |
| } STableListIdInfo;
 | |
| 
 | |
| // If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
 | |
| // The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
 | |
| typedef struct STableListInfo {
 | |
|   bool             oneTableForEachGroup;
 | |
|   int32_t          numOfOuputGroups;  // the data block will be generated one by one
 | |
|   int32_t*         groupOffset;       // keep the offset value for each group in the tableList
 | |
|   SArray*          pTableList;
 | |
|   SHashObj*        map;           // speedup acquire the tableQueryInfo by table uid
 | |
|   SHashObj*        remainGroups;  // remaining group has not yet processed the empty group
 | |
|   STableListIdInfo idInfo;        // this maybe the super table or ordinary table
 | |
| } STableListInfo;
 | |
| 
 | |
| struct SqlFunctionCtx;
 | |
| 
 | |
| int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
 | |
|                                 STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
 | |
|                                 SExecTaskInfo* pTaskInfo);
 | |
| 
 | |
| STableListInfo* tableListCreate();
 | |
| void            tableListDestroy(STableListInfo* pTableListInfo);
 | |
| void            tableListClear(STableListInfo* pTableListInfo);
 | |
| int32_t         tableListGetOutputGroups(const STableListInfo* pTableList);
 | |
| bool            oneTableForEachGroup(const STableListInfo* pTableList);
 | |
| uint64_t        tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
 | |
| int32_t         tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
 | |
| int32_t         tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo,
 | |
|                                       int32_t* num);
 | |
| int32_t         tableListGetSize(const STableListInfo* pTableList, int32_t* pRes);
 | |
| uint64_t        tableListGetSuid(const STableListInfo* pTableList);
 | |
| STableKeyInfo*  tableListGetInfo(const STableListInfo* pTableList, int32_t index);
 | |
| int32_t         tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex);
 | |
| void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type);
 | |
| 
 | |
| size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
 | |
| void   initResultRowInfo(SResultRowInfo* pResultRowInfo);
 | |
| void   closeResultRow(SResultRow* pResultRow);
 | |
| void   resetResultRow(SResultRow* pResultRow, size_t entrySize);
 | |
| 
 | |
| struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset);
 | |
| 
 | |
| static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos, bool forUpdate) {
 | |
|   SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId);
 | |
|   if (!bufPage) {
 | |
|     uFatal("failed to get the buffer page:%d since %s", pos->pageId, terrstr());
 | |
|     return NULL;
 | |
|   }
 | |
|   if (forUpdate) {
 | |
|     setBufPageDirty(bufPage, true);
 | |
|   }
 | |
| 
 | |
|   SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset);
 | |
|   return pRow;
 | |
| }
 | |
| 
 | |
| int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order);
 | |
| void    cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
 | |
| 
 | |
| void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
 | |
| bool hasRemainResults(SGroupResInfo* pGroupResInfo);
 | |
| 
 | |
| int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
 | |
| 
 | |
| SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode);
 | |
| int32_t      prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo);
 | |
| 
 | |
| EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
 | |
| int32_t  getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId,
 | |
|                                SStorageAPI* pAPI);
 | |
| size_t   getTableTagsBufLen(const SNodeList* pGroups);
 | |
| 
 | |
| SArray* createSortInfo(SNodeList* pNodeList);
 | |
| SArray* makeColumnArrayFromList(SNodeList* pNodeList);
 | |
| int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
 | |
|                             int32_t type, SColMatchInfo* pMatchInfo);
 | |
| 
 | |
| int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId);
 | |
| int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode);
 | |
| int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs);
 | |
| 
 | |
| SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
 | |
|                                      SFunctionStateStore* pStore);
 | |
| int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn);
 | |
| int32_t initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
 | |
| 
 | |
| SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
 | |
| SColumn   extractColumnFromColumnNode(SColumnNode* pColNode);
 | |
| 
 | |
| int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode,
 | |
|                                const SReadHandle* readHandle);
 | |
| void    cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
 | |
| 
 | |
| int32_t convertFillType(int32_t mode);
 | |
| int32_t resultrowComparAsc(const void* p1, const void* p2);
 | |
| int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI);
 | |
| char*   getStreamOpName(uint16_t opType);
 | |
| void    printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr);
 | |
| void    printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr);
 | |
| 
 | |
| TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols);
 | |
| void  updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta);
 | |
| 
 | |
| SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
 | |
|                                         SStorageAPI* pStorageAPI);
 | |
| 
 | |
| /**
 | |
|  * @brief build a tuple into keyBuf
 | |
|  * @param [out] keyBuf the output buf
 | |
|  * @param [in] pSortGroupCols the cols to build
 | |
|  * @param [in] pBlock block the tuple in
 | |
|  */
 | |
| int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex);
 | |
| 
 | |
| int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pDataBlock,
 | |
|                  int32_t rowIndex);
 | |
| 
 | |
| uint64_t calcGroupId(char* pData, int32_t len);
 | |
| 
 | |
| SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys);
 | |
| 
 | |
| int32_t extractKeysLen(const SArray* keys, int32_t* pLen);
 | |
| 
 | |
| #endif  // TDENGINE_EXECUTIL_H
 |