[td-11818] 1. Add orderby unit test; 2. refactor API;

This commit is contained in:
Haojun Liao 2022-02-08 10:21:00 +08:00
parent 47c88a85d4
commit c57e99e4c7
13 changed files with 1023 additions and 461 deletions

View File

@ -13,7 +13,7 @@
namespace { namespace {
// simple test // simple test
void simpleTest() { void simpleTest() {
SDiskbasedResultBuf* pResultBuf = NULL; SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, 1); int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, 1);
int32_t pageId = 0; int32_t pageId = 0;
@ -55,7 +55,7 @@ void simpleTest() {
} }
void writeDownTest() { void writeDownTest() {
SDiskbasedResultBuf* pResultBuf = NULL; SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1); int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0; int32_t pageId = 0;
@ -102,7 +102,7 @@ void writeDownTest() {
} }
void recyclePageTest() { void recyclePageTest() {
SDiskbasedResultBuf* pResultBuf = NULL; SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1); int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0; int32_t pageId = 0;

View File

@ -45,7 +45,10 @@ size_t colDataGetNumOfRows(const SSDataBlock* pBlock);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc); int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize); int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize);
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
size_t blockDataGetSize(const SSDataBlock* pBlock); size_t blockDataGetSize(const SSDataBlock* pBlock);
size_t blockDataGetRowSize(const SSDataBlock* pBlock); size_t blockDataGetRowSize(const SSDataBlock* pBlock);

View File

@ -24,7 +24,7 @@ typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param);
typedef struct SLoserTreeNode { typedef struct SLoserTreeNode {
int32_t index; int32_t index;
void *pData; void *pData; // TODO remove it?
} SLoserTreeNode; } SLoserTreeNode;
typedef struct SLoserTreeInfo { typedef struct SLoserTreeInfo {
@ -35,7 +35,7 @@ typedef struct SLoserTreeInfo {
SLoserTreeNode *pNode; SLoserTreeNode *pNode;
} SLoserTreeInfo; } SLoserTreeInfo;
uint32_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); int32_t tLoserTreeCreate(SLoserTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn);
void tLoserTreeInit(SLoserTreeInfo *pTree); void tLoserTreeInit(SLoserTreeInfo *pTree);

View File

@ -26,54 +26,8 @@ extern "C" {
#include "tlockfree.h" #include "tlockfree.h"
typedef struct SArray* SIDList; typedef struct SArray* SIDList;
typedef struct SPageInfo SPageInfo;
typedef struct SPageDiskInfo { typedef struct SDiskbasedBuf SDiskbasedBuf;
int32_t offset;
int32_t length;
} SPageDiskInfo;
typedef struct SPageInfo {
SListNode* pn; // point to list node
int32_t pageId;
SPageDiskInfo info;
void* pData;
bool used; // set current page is in used
} SPageInfo;
typedef struct SFreeListItem {
int32_t offset;
int32_t len;
} SFreeListItem;
typedef struct SResultBufStatis {
int32_t flushBytes;
int32_t loadBytes;
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
} SResultBufStatis;
typedef struct SDiskbasedResultBuf {
int32_t numOfPages;
int64_t totalBufSize;
int64_t fileSize; // disk file size
FILE* file;
int32_t allocateId; // allocated page id
char* path; // file path
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* groupSet; // id hash table
SHashObj* all;
SList* lruList;
void* emptyDummyIdList; // dummy id list
void* assistBuf; // assistant buffer for compress/decompress data
SArray* pFree; // free area in file
bool comp; // compressed before flushed to disk
int32_t nextPos; // next page flush position
uint64_t qId; // for debug purpose
SResultBufStatis statis;
} SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1} #define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
@ -93,7 +47,7 @@ typedef struct SFilePage {
* @param handle * @param handle
* @return * @return
*/ */
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); int32_t createDiskbasedResultBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir);
/** /**
* *
@ -102,7 +56,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa
* @param pageId * @param pageId
* @return * @return
*/ */
SFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId); SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pageId);
/** /**
* *
@ -110,7 +64,7 @@ SFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
* @param groupId * @param groupId
* @return * @return
*/ */
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId);
/** /**
* get the specified buffer page by id * get the specified buffer page by id
@ -118,49 +72,69 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
* @param id * @param id
* @return * @return
*/ */
SFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id); SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id);
/** /**
* release the referenced buf pages * release the referenced buf pages
* @param pResultBuf * @param pResultBuf
* @param page * @param page
*/ */
void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page); void releaseResBufPage(SDiskbasedBuf* pResultBuf, void* page);
/** /**
* *
* @param pResultBuf * @param pResultBuf
* @param pi * @param pi
*/ */
void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi); void releaseResBufPageInfo(SDiskbasedBuf* pResultBuf, struct SPageInfo* pi);
/** /**
* get the total buffer size in the format of disk file * get the total buffer size in the format of disk file
* @param pResultBuf * @param pResultBuf
* @return * @return
*/ */
size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf); size_t getResBufSize(const SDiskbasedBuf* pResultBuf);
/** /**
* get the number of groups in the result buffer * get the number of groups in the result buffer
* @param pResultBuf * @param pResultBuf
* @return * @return
*/ */
size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf); size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pResultBuf);
/** /**
* destroy result buffer * destroy result buffer
* @param pResultBuf * @param pResultBuf
*/ */
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf); void destroyResultBuf(SDiskbasedBuf* pResultBuf);
/** /**
* *
* @param pList * @param pList
* @return * @return
*/ */
SPageInfo* getLastPageInfo(SIDList pList); struct SPageInfo* getLastPageInfo(SIDList pList);
/**
*
* @param pPgInfo
* @return
*/
int32_t getPageId(const struct SPageInfo* pPgInfo);
/**
* Return the buffer page size.
* @param pResultBuf
* @return
*/
int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf);
/**
*
* @param pResultBuf
* @return
*/
bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -246,9 +246,11 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
} }
pColumnInfoData->varmeta.offset = (int32_t*) p; pColumnInfoData->varmeta.offset = (int32_t*) p;
memcpy(pColumnInfoData->varmeta.offset + sizeof(int32_t) * numOfRow1, pSource->varmeta.offset, sizeof(int32_t) * numOfRow2); for(int32_t i = 0; i < numOfRow2; ++i) {
pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length;
}
// copy the // copy data
uint32_t len = pSource->varmeta.length; uint32_t len = pSource->varmeta.length;
uint32_t oldLen = pColumnInfoData->varmeta.length; uint32_t oldLen = pColumnInfoData->varmeta.length;
if (pColumnInfoData->varmeta.allocLen < len + oldLen) { if (pColumnInfoData->varmeta.allocLen < len + oldLen) {
@ -261,7 +263,8 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
pColumnInfoData->varmeta.allocLen = len + oldLen; pColumnInfoData->varmeta.allocLen = len + oldLen;
} }
memcpy(pColumnInfoData->pData + oldLen, pSource->pData + sizeof(int32_t), len); memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
pColumnInfoData->varmeta.length = len + oldLen;
} else { } else {
doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2); doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2);
@ -414,18 +417,62 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
} }
} }
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) {
if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
return NULL;
}
SSDataBlock* pDst = calloc(1, sizeof(SSDataBlock));
pDst->info = pBlock->info;
pDst->info.rows = 0;
pDst->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0};
SColumnInfoData* pSrcCol = taosArrayGet(pBlock->pDataBlock, i);
colInfo.info = pSrcCol->info;
if (IS_VAR_DATA_TYPE(pSrcCol->info.type)) {
SVarColAttr* pAttr = &colInfo.varmeta;
pAttr->offset = calloc(rowCount, sizeof(int32_t));
} else {
colInfo.nullbitmap = calloc(1, BitmapLen(rowCount));
colInfo.pData = calloc(rowCount, colInfo.info.bytes);
}
taosArrayPush(pDst->pDataBlock, &colInfo);
}
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg);
char* p = colDataGet(pColData, j);
colDataAppend(pDstCol, j - startIndex, p, isNull);
}
}
pDst->info.rows = rowCount;
return pDst;
}
/** /**
* *
* +---------------------------+---------------------+ * +------------------+---------------+--------------------+
* |the number of rows(4 bytes)| column #1 | * |the number of rows| column length | column #1 |
* |---------------------+ * | (4 bytes) | (4 bytes) |--------------------+
* | | null bitmap| values | * | | | null bitmap| values|
* +---------------------------+---------------------+ * +------------------+---------------+--------------------+
* @param buf * @param buf
* @param pBlock * @param pBlock
* @return * @return
*/ */
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { // TODO add the column length!!
ASSERT(pBlock != NULL); ASSERT(pBlock != NULL);
// write the number of rows // write the number of rows
@ -447,6 +494,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
} }
uint32_t dataSize = colDataGetSize(pCol, numOfRows); uint32_t dataSize = colDataGetSize(pCol, numOfRows);
*(int32_t*) pStart = dataSize;
pStart += sizeof(int32_t);
memcpy(pStart, pCol->pData, dataSize); memcpy(pStart, pCol->pData, dataSize);
pStart += dataSize; pStart += dataSize;
} }
@ -454,6 +505,48 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
return 0; return 0;
} }
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
pBlock->info.rows = *(int32_t*) buf;
int32_t numOfCols = pBlock->info.numOfCols;
const char* pStart = buf + sizeof(uint32_t);
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
size_t metaSize = pBlock->info.rows * sizeof(int32_t);
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
char* p = realloc(pCol->varmeta.offset, metaSize);
if (p == NULL) {
// TODO handle error
}
pCol->varmeta.offset = (int32_t*)p;
memcpy(pCol->varmeta.offset, pStart, metaSize);
pStart += metaSize;
} else {
char* p = realloc(pCol->nullbitmap, BitmapLen(pBlock->info.rows));
if (p == NULL) {
// TODO handle error
}
pCol->nullbitmap = p;
memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows));
pStart += BitmapLen(pBlock->info.rows);
}
int32_t colLength = *(int32_t*) pStart;
pStart += sizeof(int32_t);
if (pCol->pData == NULL) {
pCol->pData = malloc(pCol->info.bytes * 4096); // TODO refactor the memory mgmt
}
memcpy(pCol->pData, pStart, colLength);
pStart += colLength;
}
}
size_t blockDataGetRowSize(const SSDataBlock* pBlock) { size_t blockDataGetRowSize(const SSDataBlock* pBlock) {
ASSERT(pBlock != NULL); ASSERT(pBlock != NULL);
size_t rowSize = 0; size_t rowSize = 0;
@ -507,12 +600,12 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
switch(pColInfoData->info.type) { switch(pColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
if (*(int32_t*) left1 == *(int32_t*) right1) { if (*(int32_t*) left1 == *(int32_t*) right1) {
continue;// TODO continue break;
} else { } else {
if (pOrder->order == TSDB_ORDER_ASC) { if (pOrder->order == TSDB_ORDER_ASC) {
return (*(int32_t*) left1 < *(int32_t*) right1)? -1:1; return (*(int32_t*) left1 <= *(int32_t*) right1)? -1:1;
} else { } else {
return (*(int32_t*) left1 < *(int32_t*) right1)? 1:-1; return (*(int32_t*) left1 <= *(int32_t*) right1)? 1:-1;
} }
} }
} }
@ -624,10 +717,13 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
return terrno; return terrno;
} }
int64_t p0 = taosGetTimestampUs();
SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar); taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
int32_t numOfCols = pDataBlock->info.numOfCols; int64_t p1 = taosGetTimestampUs();
SColumnInfoData* pCols = createHelpColInfoData(pDataBlock); SColumnInfoData* pCols = createHelpColInfoData(pDataBlock);
if (pCols == NULL) { if (pCols == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -640,7 +736,10 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
printf("%d, %d, %d\n", index[i], ((int32_t*)px->pData)[i], ((int32_t*)px->pData)[index[i]]); printf("%d, %d, %d\n", index[i], ((int32_t*)px->pData)[i], ((int32_t*)px->pData)[index[i]]);
} }
#endif #endif
int64_t p2 = taosGetTimestampUs();
blockDataAssign(pCols, pDataBlock, index); blockDataAssign(pCols, pDataBlock, index);
int64_t p3 = taosGetTimestampUs();
#if 0 #if 0
for(int32_t i = 0; i < pDataBlock->info.rows; ++i) { for(int32_t i = 0; i < pDataBlock->info.rows; ++i) {
@ -655,5 +754,8 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
#endif #endif
copyBackToBlock(pDataBlock, pCols); copyBackToBlock(pDataBlock, pCols);
int64_t p4 = taosGetTimestampUs();
printf("sort:%ld, create:%ld, assign:%ld, copyback:%ld\n", p1-p0, p2 - p1, p3 - p2, p4-p3);
destroyTupleIndex(index); destroyTupleIndex(index);
} }

View File

@ -15,8 +15,13 @@
#ifndef TDENGINE_EXECUTORIMPL_H #ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H #define TDENGINE_EXECUTORIMPL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h" #include "os.h"
#include "common.h" #include "common.h"
#include "tlosertree.h"
#include "ttszip.h" #include "ttszip.h"
#include "tvariant.h" #include "tvariant.h"
@ -36,14 +41,14 @@ struct SColumnFilterElem;
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) #define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL ? 0 : ((_r)->outputBuf)->info.rows)
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData? 1 : 0) #define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
enum { enum {
// when this task starts to execute, this status will set // when this task starts to execute, this status will set
@ -62,8 +67,8 @@ enum {
}; };
typedef struct SResultRowCell { typedef struct SResultRowCell {
uint64_t groupId; uint64_t groupId;
SResultRow *pRow; SResultRow* pRow;
} SResultRowCell; } SResultRowCell;
/** /**
@ -80,24 +85,24 @@ typedef struct SColumnFilterElem {
int16_t bytes; // column length int16_t bytes; // column length
__filter_func_t fp; __filter_func_t fp;
SColumnFilterInfo filterInfo; SColumnFilterInfo filterInfo;
void *q; void* q;
} SColumnFilterElem; } SColumnFilterElem;
typedef struct SSingleColumnFilterInfo { typedef struct SSingleColumnFilterInfo {
void* pData; void* pData;
void* pData2; //used for nchar column void* pData2; // used for nchar column
int32_t numOfFilters; int32_t numOfFilters;
SColumnInfo info; SColumnInfo info;
SColumnFilterElem* pFilters; SColumnFilterElem* pFilters;
} SSingleColumnFilterInfo; } SSingleColumnFilterInfo;
typedef struct STableQueryInfo { typedef struct STableQueryInfo {
TSKEY lastKey; TSKEY lastKey;
int32_t groupIndex; // group id in table list int32_t groupIndex; // group id in table list
SVariant tag; SVariant tag;
STimeWindow win; // todo remove it later STimeWindow win; // todo remove it later
STSCursor cur; STSCursor cur;
void* pTable; // for retrieve the page id list void* pTable; // for retrieve the page id list
SResultRowInfo resInfo; SResultRowInfo resInfo;
} STableQueryInfo; } STableQueryInfo;
@ -109,11 +114,11 @@ typedef enum {
typedef struct { typedef struct {
EQueryProfEventType eventType; EQueryProfEventType eventType;
int64_t eventTime; int64_t eventTime;
union { union {
uint8_t operatorType; //for operator event uint8_t operatorType; // for operator event
int32_t abortCode; //for query abort event int32_t abortCode; // for query abort event
}; };
} SQueryProfEvent; } SQueryProfEvent;
@ -124,33 +129,33 @@ typedef struct {
} SOperatorProfResult; } SOperatorProfResult;
typedef struct STaskCostInfo { typedef struct STaskCostInfo {
int64_t created; int64_t created;
int64_t start; int64_t start;
int64_t end; int64_t end;
uint64_t loadStatisTime; uint64_t loadStatisTime;
uint64_t loadFileBlockTime; uint64_t loadFileBlockTime;
uint64_t loadDataInCacheTime; uint64_t loadDataInCacheTime;
uint64_t loadStatisSize; uint64_t loadStatisSize;
uint64_t loadFileBlockSize; uint64_t loadFileBlockSize;
uint64_t loadDataInCacheSize; uint64_t loadDataInCacheSize;
uint64_t loadDataTime; uint64_t loadDataTime;
uint64_t totalRows; uint64_t totalRows;
uint64_t totalCheckedRows; uint64_t totalCheckedRows;
uint32_t totalBlocks; uint32_t totalBlocks;
uint32_t loadBlocks; uint32_t loadBlocks;
uint32_t loadBlockStatis; uint32_t loadBlockStatis;
uint32_t discardBlocks; uint32_t discardBlocks;
uint64_t elapsedTime; uint64_t elapsedTime;
uint64_t firstStageMergeTime; uint64_t firstStageMergeTime;
uint64_t winInfoSize; uint64_t winInfoSize;
uint64_t tableInfoSize; uint64_t tableInfoSize;
uint64_t hashSize; uint64_t hashSize;
uint64_t numOfTimeWindows; uint64_t numOfTimeWindows;
SArray *queryProfEvents; //SArray<SQueryProfEvent> SArray* queryProfEvents; // SArray<SQueryProfEvent>
SHashObj *operatorProfResults; //map<operator_type, SQueryProfEvent> SHashObj* operatorProfResults; // map<operator_type, SQueryProfEvent>
} STaskCostInfo; } STaskCostInfo;
typedef struct { typedef struct {
@ -166,67 +171,67 @@ typedef struct {
// The basic query information extracted from the SQueryInfo tree to support the // The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node. // execution of query in a data node.
typedef struct STaskAttr { typedef struct STaskAttr {
SLimit limit; SLimit limit;
SLimit slimit; SLimit slimit;
// todo comment it // todo comment it
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag bool topBotQuery; // TODO used bitwise flag
bool groupbyColumn; // denote if this is a groupby normal column query bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation bool timeWindowInterpo; // if the time window start/end required interpolation
bool queryBlockDist; // if query data block distribution bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query bool stabledev; // super table stddev query
bool tsCompQuery; // is tscomp query bool tsCompQuery; // is tscomp query
bool diffQuery; // is diff query bool diffQuery; // is diff query
bool simpleAgg; bool simpleAgg;
bool pointInterpQuery; // point interpolation query bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan bool needReverseScan; // need reverse scan
bool distinct; // distinct query or not bool distinct; // distinct query or not
bool stateWindow; // window State on sub/normal table bool stateWindow; // window State on sub/normal table
bool createFilterOperator; // if filter operator is needed bool createFilterOperator; // if filter operator is needed
bool multigroupResult; // multigroup result can exist in one SSDataBlock bool multigroupResult; // multigroup result can exist in one SSDataBlock
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
SOrder order; SOrder order;
int16_t numOfCols; int16_t numOfCols;
int16_t numOfTags; int16_t numOfTags;
STimeWindow window; STimeWindow window;
SInterval interval; SInterval interval;
SSessionWindow sw; SSessionWindow sw;
int16_t precision; int16_t precision;
int16_t numOfOutput; int16_t numOfOutput;
int16_t fillType; int16_t fillType;
int32_t srcRowSize; // todo extract struct int32_t srcRowSize; // todo extract struct
int32_t resultRowSize; int32_t resultRowSize;
int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query. int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query.
int32_t maxTableColumnWidth; int32_t maxTableColumnWidth;
int32_t tagLen; // tag value length of current query int32_t tagLen; // tag value length of current query
SGroupbyExpr *pGroupbyExpr; SGroupbyExpr* pGroupbyExpr;
SExprInfo* pExpr1; SExprInfo* pExpr1;
SExprInfo* pExpr2; SExprInfo* pExpr2;
int32_t numOfExpr2; int32_t numOfExpr2;
SExprInfo* pExpr3; SExprInfo* pExpr3;
int32_t numOfExpr3; int32_t numOfExpr3;
SColumnInfo* tableCols; SColumnInfo* tableCols;
SColumnInfo* tagColList; SColumnInfo* tagColList;
int32_t numOfFilterCols; int32_t numOfFilterCols;
int64_t* fillVal; int64_t* fillVal;
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
// SFilterInfo *pFilters; // SFilterInfo *pFilters;
void* tsdb; void* tsdb;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId; int32_t vgId;
SArray *pUdfInfo; // no need to free SArray* pUdfInfo; // no need to free
} STaskAttr; } STaskAttr;
typedef int32_t (*__optr_prepare_fn_t)(void* param); typedef int32_t (*__optr_prepare_fn_t)(void* param);
@ -236,176 +241,172 @@ typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
struct SOperatorInfo; struct SOperatorInfo;
typedef struct STaskIdInfo { typedef struct STaskIdInfo {
uint64_t queryId; // this is also a request id uint64_t queryId; // this is also a request id
uint64_t subplanId; uint64_t subplanId;
uint64_t templateId; uint64_t templateId;
char *str; char* str;
} STaskIdInfo; } STaskIdInfo;
typedef struct SExecTaskInfo { typedef struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
char *content; char* content;
uint32_t status; uint32_t status;
STimeWindow window; STimeWindow window;
STaskCostInfo cost; STaskCostInfo cost;
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
int32_t code; int32_t code;
uint64_t totalRows; // total number of rows uint64_t totalRows; // total number of rows
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char *sql; // query sql string char* sql; // query sql string
jmp_buf env; // jmp_buf env; //
struct SOperatorInfo *pRoot; struct SOperatorInfo* pRoot;
} SExecTaskInfo; } SExecTaskInfo;
typedef struct STaskRuntimeEnv { typedef struct STaskRuntimeEnv {
jmp_buf env; jmp_buf env;
STaskAttr* pQueryAttr; STaskAttr* pQueryAttr;
uint32_t status; // query status uint32_t status; // query status
void* qinfo; void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not uint8_t scanFlag; // denotes reversed scan of data or not
void* pTsdbReadHandle; void* pTsdbReadHandle;
int32_t prevGroupId; // previous executed group id int32_t prevGroupId; // previous executed group id
bool enableGroupData; bool enableGroupData;
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SResultRowPool* pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. SResultRowPool*
char** prevRow; pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
char** prevRow;
SArray* prevResult; // intermediate result, SArray<SInterResult> SArray* prevResult; // intermediate result, SArray<SInterResult>
STSBuf* pTsBuf; // timestamp filter list STSBuf* pTsBuf; // timestamp filter list
STSCursor cur; STSCursor cur;
char* tagVal; // tag value of current data block char* tagVal; // tag value of current data block
struct SScalarFunctionSupport * scalarSup; struct SScalarFunctionSupport* scalarSup;
SSDataBlock *outputBuf; SSDataBlock* outputBuf;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
struct SOperatorInfo *proot; struct SOperatorInfo* proot;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value int64_t currentOffset; // dynamic offset value
STableQueryInfo *current; STableQueryInfo* current;
SRspResultInfo resultInfo; SRspResultInfo resultInfo;
SHashObj *pTableRetrieveTsMap; SHashObj* pTableRetrieveTsMap;
struct SUdfInfo *pUdfInfo; struct SUdfInfo* pUdfInfo;
} STaskRuntimeEnv; } STaskRuntimeEnv;
enum { enum {
OP_IN_EXECUTING = 1, OP_IN_EXECUTING = 1,
OP_RES_TO_RETURN = 2, OP_RES_TO_RETURN = 2,
OP_EXEC_DONE = 3, OP_EXEC_DONE = 3,
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
uint8_t operatorType; uint8_t operatorType;
bool blockingOptr; // block operator or not bool blockingOptr; // block operator or not
uint8_t status; // denote if current operator is completed uint8_t status; // denote if current operator is completed
int32_t numOfOutput; // number of columns of the current operator results int32_t numOfOutput; // number of columns of the current operator results
char *name; // name, used to show the query execution plan char* name; // name, used to show the query execution plan
void *info; // extension attribution void* info; // extension attribution
SExprInfo *pExpr; SExprInfo* pExpr;
STaskRuntimeEnv *pRuntimeEnv; // todo remove it STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo *pTaskInfo; SExecTaskInfo* pTaskInfo;
struct SOperatorInfo **pDownstream; // downstram pointer list struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_prepare_fn_t prepareFn; __optr_prepare_fn_t prepareFn;
__operator_fn_t exec; __operator_fn_t exec;
__optr_cleanup_fn_t cleanupFn; __optr_cleanup_fn_t cleanupFn;
} SOperatorInfo; } SOperatorInfo;
enum {
QUERY_RESULT_NOT_READY = 1,
QUERY_RESULT_READY = 2,
};
typedef struct { typedef struct {
int32_t numOfTags; int32_t numOfTags;
int32_t numOfCols; int32_t numOfCols;
SColumnInfo *colList; SColumnInfo* colList;
} SQueriedTableInfo; } SQueriedTableInfo;
typedef struct SQInfo { typedef struct SQInfo {
void* signature; void* signature;
uint64_t qId; uint64_t qId;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
STaskRuntimeEnv runtimeEnv; STaskRuntimeEnv runtimeEnv;
STaskAttr query; STaskAttr query;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads pthread_mutex_t lock; // used to synchronize the rsp/query threads
tsem_t ready; tsem_t ready;
int32_t dataReady; // denote if query result is ready or not int32_t dataReady; // denote if query result is ready or not
void* rspContext; // response context void* rspContext; // response context
int64_t startExecTs; // start to exec timestamp int64_t startExecTs; // start to exec timestamp
char* sql; // query sql string char* sql; // query sql string
STaskCostInfo summary; STaskCostInfo summary;
} SQInfo; } SQInfo;
typedef struct STaskParam { typedef struct STaskParam {
char *sql; char* sql;
char *tagCond; char* tagCond;
char *colCond; char* colCond;
char *tbnameCond; char* tbnameCond;
char *prevResult; char* prevResult;
SArray *pTableIdList; SArray* pTableIdList;
SSqlExpr **pExpr; SSqlExpr** pExpr;
SSqlExpr **pSecExpr; SSqlExpr** pSecExpr;
SExprInfo *pExprs; SExprInfo* pExprs;
SExprInfo *pSecExprs; SExprInfo* pSecExprs;
SFilterInfo *pFilters; SFilterInfo* pFilters;
SColIndex *pGroupColIndex; SColIndex* pGroupColIndex;
SColumnInfo *pTagColumnInfo; SColumnInfo* pTagColumnInfo;
SGroupbyExpr *pGroupbyExpr; SGroupbyExpr* pGroupbyExpr;
int32_t tableScanOperator; int32_t tableScanOperator;
SArray *pOperator; SArray* pOperator;
struct SUdfInfo *pUdfInfo; struct SUdfInfo* pUdfInfo;
} STaskParam; } STaskParam;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray *pSources; SArray* pSources;
tsem_t ready; tsem_t ready;
void *pTransporter; void* pTransporter;
SRetrieveTableRsp *pRsp; SRetrieveTableRsp* pRsp;
SSDataBlock *pResult; SSDataBlock* pResult;
int32_t current; int32_t current;
uint64_t rowsOfCurrentSource; uint64_t rowsOfCurrentSource;
uint64_t totalSize; // total load bytes from remote uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows uint64_t totalRows; // total number of rows
uint64_t totalElapsed;// total elapsed time uint64_t totalElapsed; // total elapsed time
} SExchangeInfo; } SExchangeInfo;
typedef struct STableScanInfo { typedef struct STableScanInfo {
void *pTsdbReadHandle; void* pTsdbReadHandle;
int32_t numOfBlocks; // extract basic running information. int32_t numOfBlocks; // extract basic running information.
int32_t numOfSkipped; int32_t numOfSkipped;
int32_t numOfBlockStatis; int32_t numOfBlockStatis;
int64_t numOfRows; int64_t numOfRows;
int32_t order; // scan order int32_t order; // scan order
int32_t times; // repeat counts int32_t times; // repeat counts
int32_t current; int32_t current;
int32_t reverseTimes; // 0 by default int32_t reverseTimes; // 0 by default
SqlFunctionCtx *pCtx; // next operator query context SqlFunctionCtx* pCtx; // next operator query context
SResultRowInfo *pResultRowInfo; SResultRowInfo* pResultRowInfo;
int32_t *rowCellInfoOffset; int32_t* rowCellInfoOffset;
SExprInfo *pExpr; SExprInfo* pExpr;
SSDataBlock block; SSDataBlock block;
int32_t numOfOutput; int32_t numOfOutput;
int64_t elapsedTime; int64_t elapsedTime;
int32_t prevGroupId; // previous table group id int32_t prevGroupId; // previous table group id
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
} STableScanInfo; } STableScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
@ -416,32 +417,33 @@ typedef struct STagScanInfo {
} STagScanInfo; } STagScanInfo;
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
SSDataBlock *pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
SColumnInfo *pCols; // the output column info SColumnInfo* pCols; // the output column info
uint64_t numOfRows; // total scanned rows uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
void *readerHandle;// stream block reader handle void* readerHandle; // stream block reader handle
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo; SResultRowInfo resultRowInfo;
int32_t *rowCellInfoOffset; // offset value for each row result cell info int32_t* rowCellInfoOffset; // offset value for each row result cell info
SqlFunctionCtx *pCtx; SqlFunctionCtx* pCtx;
SSDataBlock *pRes; SSDataBlock* pRes;
} SOptrBasicInfo; } SOptrBasicInfo;
typedef struct SOptrBasicInfo STableIntervalOperatorInfo; typedef struct SOptrBasicInfo STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
uint32_t seed; uint32_t seed;
SDiskbasedResultBuf *pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SResultRowPool* pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. SResultRowPool*
STableQueryInfo *current; pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
STableQueryInfo* current;
} SAggOperatorInfo; } SAggOperatorInfo;
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
@ -449,52 +451,52 @@ typedef struct SProjectOperatorInfo {
int32_t bufCapacity; int32_t bufCapacity;
uint32_t seed; uint32_t seed;
SSDataBlock *existDataBlock; SSDataBlock* existDataBlock;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SLimitOperatorInfo { typedef struct SLimitOperatorInfo {
int64_t limit; int64_t limit;
int64_t total; int64_t total;
} SLimitOperatorInfo; } SLimitOperatorInfo;
typedef struct SSLimitOperatorInfo { typedef struct SSLimitOperatorInfo {
int64_t groupTotal; int64_t groupTotal;
int64_t currentGroupOffset; int64_t currentGroupOffset;
int64_t rowsTotal; int64_t rowsTotal;
int64_t currentOffset; int64_t currentOffset;
SLimit limit; SLimit limit;
SLimit slimit; SLimit slimit;
char **prevRow; char** prevRow;
SArray *orderColumnList; SArray* orderColumnList;
bool hasPrev; bool hasPrev;
bool ignoreCurrentGroup; bool ignoreCurrentGroup;
bool multigroupResult; bool multigroupResult;
SSDataBlock *pRes; // result buffer SSDataBlock* pRes; // result buffer
SSDataBlock *pPrevBlock; SSDataBlock* pPrevBlock;
int64_t capacity; int64_t capacity;
int64_t threshold; int64_t threshold;
} SSLimitOperatorInfo; } SSLimitOperatorInfo;
typedef struct SFilterOperatorInfo { typedef struct SFilterOperatorInfo {
SSingleColumnFilterInfo *pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
int32_t numOfFilterCols; int32_t numOfFilterCols;
} SFilterOperatorInfo; } SFilterOperatorInfo;
typedef struct SFillOperatorInfo { typedef struct SFillOperatorInfo {
struct SFillInfo *pFillInfo; struct SFillInfo* pFillInfo;
SSDataBlock *pRes; SSDataBlock* pRes;
int64_t totalInputRows; int64_t totalInputRows;
void **p; void** p;
SSDataBlock *existNewGroupBlock; SSDataBlock* existNewGroupBlock;
bool multigroupResult; bool multigroupResult;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
int32_t colIndex; int32_t colIndex;
char *prevData; // previous group by value char* prevData; // previous group by value
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
typedef struct SSWindowOperatorInfo { typedef struct SSWindowOperatorInfo {
@ -503,16 +505,16 @@ typedef struct SSWindowOperatorInfo {
TSKEY prevTs; // previous timestamp TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows int32_t numOfRows; // number of rows
int32_t start; // start row index int32_t start; // start row index
bool reptScan; // next round scan bool reptScan; // next round scan
} SSWindowOperatorInfo; } SSWindowOperatorInfo;
typedef struct SStateWindowOperatorInfo { typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
STimeWindow curWindow; // current time window STimeWindow curWindow; // current time window
int32_t numOfRows; // number of rows int32_t numOfRows; // number of rows
int32_t colIndex; // start row index int32_t colIndex; // start row index
int32_t start; int32_t start;
char* prevData; // previous data char* prevData; // previous data
bool reptScan; bool reptScan;
} SStateWindowOperatorInfo; } SStateWindowOperatorInfo;
@ -523,80 +525,103 @@ typedef struct SDistinctDataInfo {
} SDistinctDataInfo; } SDistinctDataInfo;
typedef struct SDistinctOperatorInfo { typedef struct SDistinctOperatorInfo {
SHashObj *pSet; SHashObj* pSet;
SSDataBlock *pRes; SSDataBlock* pRes;
bool recordNullVal; //has already record the null value, no need to try again bool recordNullVal; // has already record the null value, no need to try again
int64_t threshold; int64_t threshold;
int64_t outputCapacity; int64_t outputCapacity;
int32_t totalBytes; int32_t totalBytes;
char* buf; char* buf;
SArray* pDistinctDataInfo; SArray* pDistinctDataInfo;
} SDistinctOperatorInfo; } SDistinctOperatorInfo;
struct SGlobalMerger; struct SGlobalMerger;
typedef struct SMultiwayMergeInfo { typedef struct SMultiwayMergeInfo {
struct SGlobalMerger *pMerge; struct SGlobalMerger* pMerge;
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
int32_t bufCapacity; int32_t bufCapacity;
int64_t seed; int64_t seed;
char **prevRow; char** prevRow;
SArray *orderColumnList; SArray* orderColumnList;
int32_t resultRowFactor; int32_t resultRowFactor;
bool hasGroupColData; bool hasGroupColData;
char **currentGroupColData; char** currentGroupColData;
SArray *groupColumnList; SArray* groupColumnList;
bool hasDataBlockForNewGroup; bool hasDataBlockForNewGroup;
SSDataBlock *pExistBlock; SSDataBlock* pExistBlock;
SArray *udfInfo; SArray* udfInfo;
bool hasPrev; bool hasPrev;
bool multiGroupResults; bool multiGroupResults;
} SMultiwayMergeInfo; } SMultiwayMergeInfo;
// todo support the disk-based sort
typedef struct SOrderOperatorInfo { typedef struct SOrderOperatorInfo {
uint32_t sortBufSize; // max buffer size for in-memory sort int32_t sourceId;
SArray *orderInfo; // SArray<SBlockOrderInfo> uint32_t sortBufSize; // max buffer size for in-memory sort
SSDataBlock *pDataBlock; SArray* orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst; // null value is put in the front SSDataBlock* pDataBlock;
bool nullFirst; // null value is put in the front
bool hasVarCol; // has variable length column, such as binary/varchar/nchar
SDiskbasedBuf* pSortInternalBuf;
int32_t numOfSources;
int32_t numOfCompleted;
SLoserTreeInfo *pMergeTree;
SArray *pSources; // SArray<SExternalMemSource*>
} SOrderOperatorInfo; } SOrderOperatorInfo;
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger); int32_t numOfRows, void* merger);
SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo,
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); bool groupResultMixedUp);
SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SOrder* pOrderVal); int32_t numOfOutput);
SOperatorInfo* createMergeSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal); SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal);
SOperatorInfo* createMergeSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput,
SOrder* pOrderVal);
//SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); // SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
//SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); // SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
//SSDataBlock* doSLimit(void* param, bool* newgroup); // SSDataBlock* doSLimit(void* param, bool* newgroup);
//int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); // int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p); bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p);
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
@ -606,58 +631,64 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo,
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows); int32_t* rowCellInfoOffset);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOfInputRows);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, SSqlExpr** pExprMsg,
SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo); SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo);
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr **pExpr, SExprInfo *prevExpr, struct SUdfInfo *pUdfInfo); SSqlExpr** pExpr, SExprInfo* prevExpr, struct SUdfInfo* pUdfInfo);
int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters); int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters);
SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code); SGroupbyExpr* createGroupbyExprFromMsg(SQueryTableReq* pQueryMsg, SColIndex* pColIndex, int32_t* code);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
int32_t prevResultLen, void* merger); int32_t prevResultLen, void* merger);
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId); int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf); STableQueryInfo* createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win,
void* buf);
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win); STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg); int32_t buildArithmeticExprFromMsg(SExprInfo* pArithExprInfo, void* pQueryMsg);
bool isTaskKilled(SExecTaskInfo *pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool checkNeedToCompressQueryCol(SQInfo *pQInfo); bool checkNeedToCompressQueryCol(SQInfo* pQInfo);
void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status); void setQueryStatus(STaskRuntimeEnv* pRuntimeEnv, int8_t status);
bool onlyQueryTags(STaskAttr* pQueryAttr); bool onlyQueryTags(STaskAttr* pQueryAttr);
//void destroyUdfInfo(struct SUdfInfo* pUdfInfo); // void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen); int32_t doDumpQueryResult(SQInfo* pQInfo, char* data, int8_t compressed, int32_t* compLen);
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); size_t getResultSize(SQInfo* pQInfo, int64_t* numOfRows);
void setTaskKilled(SExecTaskInfo *pTaskInfo); void setTaskKilled(SExecTaskInfo* pTaskInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SExecTaskInfo * pTaskInfo, int32_t code); void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo); void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SExecTaskInfo *pTaskInfo); void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo *pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void freeQueryAttr(STaskAttr *pQuery); void freeQueryAttr(STaskAttr* pQuery);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx *pCtx, int32_t idx, int32_t type); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_EXECUTORIMPL_H #endif // TDENGINE_EXECUTORIMPL_H

View File

@ -141,7 +141,7 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_
return; return;
} }
// the result does not put into the SDiskbasedResultBuf, ignore it. // the result does not put into the SDiskbasedBuf, ignore it.
if (pResultRow->pageId >= 0) { if (pResultRow->pageId >= 0) {
SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId);

View File

@ -693,7 +693,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, i
} }
// a new buffer page for each table. Needs to opt this design // a new buffer page for each table. Needs to opt this design
static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid, uint32_t size) { static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pResultBuf, int32_t tid, uint32_t size) {
if (pWindowRes->pageId != -1) { if (pWindowRes->pageId != -1) {
return 0; return 0;
} }
@ -708,10 +708,10 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
pData = getNewDataBuf(pResultBuf, tid, &pageId); pData = getNewDataBuf(pResultBuf, tid, &pageId);
} else { } else {
SPageInfo* pi = getLastPageInfo(list); SPageInfo* pi = getLastPageInfo(list);
pData = getResBufPage(pResultBuf, pi->pageId); pData = getResBufPage(pResultBuf, getPageId(pi));
pageId = pi->pageId; pageId = getPageId(pi);
if (pData->num + size > pResultBuf->pageSize) { if (pData->num + size > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one // release current page first, and prepare the next one
releaseResBufPageInfo(pResultBuf, pi); releaseResBufPageInfo(pResultBuf, pi);
pData = getNewDataBuf(pResultBuf, tid, &pageId); pData = getNewDataBuf(pResultBuf, tid, &pageId);
@ -748,7 +748,7 @@ static int32_t setResultOutputBufByKey(STaskRuntimeEnv *pRuntimeEnv, SResultRowI
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, bool masterscan, SResultRow **pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) { int32_t numOfOutput, int32_t* rowCellInfoOffset) {
assert(win->skey <= win->ekey); assert(win->skey <= win->ekey);
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SDiskbasedBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SResultRow *pResultRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId); SResultRow *pResultRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId);
if (pResultRow == NULL) { if (pResultRow == NULL) {
@ -1755,7 +1755,7 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
} }
static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SDiskbasedBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset; int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset;
SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo; SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo;
@ -5535,6 +5535,195 @@ SOperatorInfo *createMultiwaySortOperatorInfo(STaskRuntimeEnv *pRuntimeEnv, SExp
return pOperator; return pOperator;
} }
typedef struct SExternalMemSource {
SArray* pageIdList;
int32_t pageIndex;
int32_t sourceId;
int32_t rowIndex;
SSDataBlock *pBlock;
} SExternalMemSource;
typedef struct SCompareParam {
SExternalMemSource **pSources;
int32_t num;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst;
} SCompareParam;
int32_t doMergeSortCompar(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight;
SCompareParam *pParam = (SCompareParam *)param;
SExternalMemSource **pSources = pParam->pSources;
SArray *pInfo = pParam->orderInfo;
// this input is exhausted, set the special value to denote this
if (pSources[pLeftIdx]->rowIndex == -1) {
return 1;
}
if (pSources[pRightIdx]->rowIndex == -1) {
return -1;
}
SSDataBlock* pLeftBlock = pSources[pLeftIdx]->pBlock;
SSDataBlock* pRightBlock = pSources[pRightIdx]->pBlock;
size_t num = taosArrayGetSize(pInfo);
for(int32_t i = 0; i < num; ++i) {
SBlockOrderInfo* pOrder = taosArrayGet(pInfo, i);
SColumnInfoData* pLeftColInfoData = taosArrayGet(pLeftBlock->pDataBlock, pOrder->colIndex);
bool leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pSources[pLeftIdx]->rowIndex, pLeftBlock->pBlockAgg);
SColumnInfoData* pRightColInfoData = taosArrayGet(pRightBlock->pDataBlock, pOrder->colIndex);
bool rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pSources[pRightIdx]->rowIndex, pRightBlock->pBlockAgg);
if (leftNull && rightNull) {
continue; // continue to next slot
}
if (rightNull) {
return pParam->nullFirst? 1:-1;
}
if (leftNull) {
return pParam->nullFirst? -1:1;
}
void* left1 = colDataGet(pLeftColInfoData, pSources[pLeftIdx]->rowIndex);
void* right1 = colDataGet(pRightColInfoData, pSources[pRightIdx]->rowIndex);
switch(pLeftColInfoData->info.type) {
case TSDB_DATA_TYPE_INT:
if (*(int32_t*) left1 == *(int32_t*) right1) {
break;
} else {
if (pOrder->order == TSDB_ORDER_ASC) {
return *(int32_t*) left1 <= *(int32_t*) right1? -1:1;
} else {
return *(int32_t*) left1 <= *(int32_t*) right1? 1:-1;
}
}
default:
assert(0);
}
}
}
int32_t loadNewDataBlock(SExternalMemSource *pSource, SOrderOperatorInfo* pInfo) {
pSource->rowIndex = 0;
pSource->pageIndex += 1;
if (pSource->pageIndex < taosArrayGetSize(pSource->pageIdList)) {
struct SPageInfo* pPgInfo = *(struct SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo));
return blockDataFromBuf(pSource->pBlock, pPage->data);
} else {
pInfo->numOfCompleted += 1;
pSource->rowIndex = -1;
pSource->pageIndex = -1;
return 0;
}
}
void adjustLoserTreeFromNewData(SExternalMemSource *pSource, SLoserTreeInfo *pTree, SOrderOperatorInfo* pInfo) {
/*
* load a new SDataBlock into memory of a given intermediate data-set source,
* since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
*/
if (pSource->rowIndex >= pSource->pBlock->info.rows) {
// TODO check if has remain pages.
loadNewDataBlock(pSource, pInfo);
}
/*
* Adjust loser tree otherwise, according to new candidate data
* if the loser tree is rebuild completed, we do not need to adjust
*/
int32_t leafNodeIdx = pTree->pNode[0].index + pInfo->numOfSources;
#ifdef _DEBUG_VIEW
printf("before adjust:\t");
tLoserTreeDisplay(pTree);
#endif
tLoserTreeAdjust(pTree, leafNodeIdx);
#ifdef _DEBUG_VIEW
printf("\nafter adjust:\t");
tLoserTreeDisplay(pTree);
printf("\n");
#endif
}
static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
char* pData = colDataGet(pSrcColInfo, *rowIndex);
colDataAppend(pColInfo, pBlock->info.rows, pData, isNull);
}
pBlock->info.rows += 1;
*rowIndex += 1;
}
void addToDiskBasedBuf(SOrderOperatorInfo* pInfo) {
int32_t start = 0;
while(start < pInfo->pDataBlock->info.rows) {
int32_t stop = 0;
blockDataSplitRows(pInfo->pDataBlock, pInfo->hasVarCol, start, &stop, getBufPageSize(pInfo->pSortInternalBuf));
SSDataBlock* p = blockDataExtractBlock(pInfo->pDataBlock, start, stop - start + 1);
int32_t pageId = -1;
SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId);
blockDataToBuf(pPage->data, p);
start = stop + 1;
}
int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
pInfo->pDataBlock->info.rows = 0;
if (pInfo->hasVarCol) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pInfo->pDataBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(p->info.type)) {
p->varmeta.length = 0;
}
}
}
pInfo->sourceId += 1;
// TODO extract method
SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource));
pSource->pageIdList = getDataBufPagesIdList(pInfo->pSortInternalBuf, pInfo->sourceId - 1);
pSource->sourceId = pInfo->sourceId - 1;
pSource->pBlock = calloc(1, sizeof(SSDataBlock));
pSource->pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pSource->pBlock->info.numOfCols = numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
SColumnInfoData* p = taosArrayGet(pInfo->pDataBlock->pDataBlock, i);
colInfo.info = p->info;
taosArrayPush(pSource->pBlock->pDataBlock, &colInfo);
}
taosArrayPush(pInfo->pSources, &pSource);
}
static SSDataBlock* doSort(void* param, bool* newgroup) { static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
@ -5542,8 +5731,8 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
} }
SOrderOperatorInfo* pInfo = pOperator->info; SOrderOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while(1) { while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup);
@ -5551,7 +5740,6 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
// start to flush data into disk and try do multiway merge sort // start to flush data into disk and try do multiway merge sort
if (pBlock == NULL) { if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
break; break;
} }
@ -5563,39 +5751,106 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
size_t size = blockDataGetSize(pInfo->pDataBlock); size_t size = blockDataGetSize(pInfo->pDataBlock);
if (size > pInfo->sortBufSize) { if (size > pInfo->sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk. // Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst); blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst);
printf("sort time:%ld\n", taosGetTimestampUs() - p);
// flush to disk // flush to disk
addToDiskBasedBuf(pInfo);
} }
} }
// int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; if (pInfo->pDataBlock->info.rows > 0) {
// void** pCols = calloc(numOfCols, POINTER_BYTES); pInfo->numOfSources += 1;
// SSchema* pSchema = calloc(numOfCols, sizeof(SSchema));
//
// for(int32_t i = 0; i < numOfCols; ++i) {
// SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i);
// pCols[i] = p1->pData;
// pSchema[i].colId = p1->info.colId;
// pSchema[i].bytes = p1->info.bytes;
// pSchema[i].type = (uint8_t) p1->info.type;
// }
// __compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order); // Perform the in-memory sort and then flush data in the buffer into disk.
// taoscQSort(pCols, pInfo->pDataBlock->info.rows, sizeof(int32_t), pInfo, comp); blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst);
// All sorted data are resident in memory, external memory sort is not needed.
// Return to the upstream operator directly
if (isAllDataInMemBuf(pInfo->pSortInternalBuf)) {
pOperator->status = OP_RES_TO_RETURN;
return (pInfo->pDataBlock->info.rows == 0)? NULL:pInfo->pDataBlock;
}
// flush to disk
addToDiskBasedBuf(pInfo);
}
SCompareParam cmpParam = {0};
cmpParam.nullFirst = pInfo->nullFirst;
cmpParam.orderInfo = pInfo->orderInfo;
cmpParam.num = pInfo->numOfSources;
cmpParam.pSources = pInfo->pSources->pData;
pInfo->numOfSources = taosArrayGetSize(pInfo->pSources);
for(int32_t i = 0; i < pInfo->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam.pSources[i];
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo));
int32_t code = blockDataFromBuf(cmpParam.pSources[i]->pBlock, pPage->data);
}
int32_t code = tLoserTreeCreate(&pInfo->pMergeTree, pInfo->numOfSources, &cmpParam, doMergeSortCompar);
while(1) {
if (pInfo->numOfSources == pInfo->numOfCompleted) {
break;
}
SExternalMemSource *pSource = cmpParam.pSources[pInfo->pMergeTree->pNode[0].index];
appendOneRowToDataBlock(pInfo->pDataBlock, pSource->pBlock, &pSource->rowIndex);
adjustLoserTreeFromNewData(pSource, pInfo->pMergeTree, pInfo);
if (pInfo->pDataBlock->info.rows >= 4096) {
return pInfo->pDataBlock;
}
}
// tfree(pCols);
// tfree(pSchema);
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
} }
SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SOrder* pOrderVal) { static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
SArray* pOrderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
size_t numOfOrder = taosArrayGetSize(pOrderVal);
for (int32_t j = 0; j < numOfOrder; ++j) {
SBlockOrderInfo orderInfo = {0};
SOrder* pOrder = taosArrayGet(pOrderVal, j);
orderInfo.order = pOrder->order;
for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
SExprInfo* pExpr = taosArrayGet(pExprInfo, i);
if (pExpr->base.resSchema.colId == pOrder->col.info.colId) {
orderInfo.colIndex = i;
break;
}
}
taosArrayPush(pOrderInfo, &orderInfo);
}
return pOrderInfo;
}
SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
pInfo->sortBufSize = 1024 * 1024; // 1MB pInfo->sortBufSize = 1024 * 1024; // 1MB
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, 4096); pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, 4096);
pInfo->orderInfo = taosArrayInit(1, sizeof(SOrder)); pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal);
taosArrayPush(pInfo->orderInfo, pOrderVal); // todo more than one order column pInfo->pSources = taosArrayInit(4, POINTER_BYTES);
for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
if (IS_VAR_DATA_TYPE(pExpr->base.resSchema.type)) {
pInfo->hasVarCol = true;
break;
}
}
int32_t code = createDiskbasedResultBuffer(&pInfo->pSortInternalBuf, 4096, 4096*1000, 1, "/tmp/");
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "Order"; pOperator->name = "Order";

View File

@ -33,6 +33,83 @@
#include "stub.h" #include "stub.h"
#include "executor.h" #include "executor.h"
namespace {
typedef struct SDummyInputInfo {
int32_t max;
int32_t current;
int32_t startVal;
} SDummyInputInfo;
SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(param);
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->max) {
return NULL;
}
SSDataBlock* pBlock = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
assert(pBlock != NULL);
pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
int32_t numOfRows = 1000;
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
colInfo.pData = static_cast<char*>(calloc(numOfRows, sizeof(int32_t)));
colInfo.nullbitmap = static_cast<char*>(calloc(1, (numOfRows + 7) / 8));
taosArrayPush(pBlock->pDataBlock, &colInfo);
SColumnInfoData colInfo1 = {0};
colInfo1.info.type = TSDB_DATA_TYPE_BINARY;
colInfo1.info.bytes = 40;
colInfo1.info.colId = 2;
colInfo1.varmeta.allocLen = 0;//numOfRows * sizeof(int32_t);
colInfo1.varmeta.length = 0;
colInfo1.varmeta.offset = static_cast<int32_t*>(calloc(1, numOfRows * sizeof(int32_t)));
taosArrayPush(pBlock->pDataBlock, &colInfo1);
char buf[128] = {0};
char b1[128] = {0};
for(int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(taosArrayGet(pBlock->pDataBlock, 0));
int32_t v = (--pInfo->startVal);
colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&v), false);
sprintf(buf, "this is %d row", i);
STR_TO_VARSTR(b1, buf);
SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(taosArrayGet(pBlock->pDataBlock, 1));
colDataAppend(pColInfo2, i, b1, false);
}
pBlock->info.rows = numOfRows;
pBlock->info.numOfCols = 2;
pInfo->current += 1;
return pBlock;
}
SOperatorInfo* createDummyOperator(int32_t numOfBlocks) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(calloc(1, sizeof(SOperatorInfo)));
pOperator->name = "dummyInputOpertor4Test";
pOperator->exec = getDummyBlock;
SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));
pInfo->max = numOfBlocks;
pInfo->startVal = 100000;
pOperator->info = pInfo;
return pOperator;
}
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv); testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
@ -128,9 +205,66 @@ TEST(testCase, build_executor_tree_Test) {
SExecTaskInfo* pTaskInfo = nullptr; SExecTaskInfo* pTaskInfo = nullptr;
DataSinkHandle sinkHandle = nullptr; DataSinkHandle sinkHandle = nullptr;
SReadHandle handle = {.reader = NULL, .meta = NULL}; SReadHandle handle = {.reader = reinterpret_cast<void*>(0x1), .meta = reinterpret_cast<void*>(0x1)};
int32_t code = qCreateExecTask(&handle, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); // int32_t code = qCreateExecTask(&handle, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle);
} }
//TEST(testCase, inMem_sort_Test) {
// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
// SOrder o = {.order = TSDB_ORDER_ASC};
// o.col.info.colId = 1;
// o.col.info.type = TSDB_DATA_TYPE_INT;
// taosArrayPush(pOrderVal, &o);
//
// SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo));
// SExprInfo *exp = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
// exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res");
// taosArrayPush(pExprInfo, &exp);
//
// SExprInfo *exp1 = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
// exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1);
//
// SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(5), pExprInfo, pOrderVal);
//
// bool newgroup = false;
// SSDataBlock* pRes = pOperator->exec(pOperator, &newgroup);
//
// SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
// SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
// for(int32_t i = 0; i < pRes->info.rows; ++i) {
// char* p = colDataGet(pCol2, i);
// printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
// }
//}
TEST(testCase, external_sort_Test) {
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {.order = TSDB_ORDER_ASC};
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo));
SExprInfo *exp = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res");
taosArrayPush(pExprInfo, &exp);
SExprInfo *exp1 = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(100), pExprInfo, pOrderVal);
bool newgroup = false;
SSDataBlock* pRes = pOperator->exec(pOperator, &newgroup);
SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
for(int32_t i = 0; i < pRes->info.rows; ++i) {
char* p = colDataGet(pCol2, i);
printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
}
}
#pragma GCC diagnostic pop #pragma GCC diagnostic pop

View File

@ -63,7 +63,7 @@ typedef struct tMemBucket {
__compar_fn_t comparFn; __compar_fn_t comparFn;
tMemBucketSlot * pSlots; tMemBucketSlot * pSlots;
SDiskbasedResultBuf *pBuffer; SDiskbasedBuf *pBuffer;
__perc_hash_func_t hashFunc; __perc_hash_func_t hashFunc;
} tMemBucket; } tMemBucket;

View File

@ -35,9 +35,9 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
int32_t offset = 0; int32_t offset = 0;
for(int32_t i = 0; i < list->size; ++i) { for(int32_t i = 0; i < list->size; ++i) {
SPageInfo* pgInfo = *(SPageInfo**) taosArrayGet(list, i); struct SPageInfo* pgInfo = *(struct SPageInfo**) taosArrayGet(list, i);
SFilePage* pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); SFilePage* pg = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo));
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
offset += (int32_t)(pg->num * pMemBucket->bytes); offset += (int32_t)(pg->num * pMemBucket->bytes);
@ -98,8 +98,8 @@ double findOnlyResult(tMemBucket *pMemBucket) {
SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId);
assert(list->size == 1); assert(list->size == 1);
SPageInfo* pgInfo = (SPageInfo*) taosArrayGetP(list, 0); struct SPageInfo* pgInfo = (struct SPageInfo*) taosArrayGetP(list, 0);
SFilePage* pPage = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); SFilePage* pPage = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo));
assert(pPage->num == 1); assert(pPage->num == 1);
double v = 0; double v = 0;
@ -471,7 +471,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
for (int32_t f = 0; f < list->size; ++f) { for (int32_t f = 0; f < list->size; ++f) {
SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f); SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f);
SFilePage *pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); SFilePage *pg = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo));
tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);
releaseResBufPageInfo(pMemBucket->pBuffer, pgInfo); releaseResBufPageInfo(pMemBucket->pBuffer, pgInfo);

View File

@ -39,8 +39,8 @@ void tLoserTreeDisplay(SLoserTreeInfo* pTree) {
printf("\n"); printf("\n");
} }
uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* param, __merge_compare_fn_t compareFn) { int32_t tLoserTreeCreate(SLoserTreeInfo** pTree, uint32_t numOfSources, void* param, __merge_compare_fn_t compareFn) {
int32_t totalEntries = numOfEntries << 1; int32_t totalEntries = numOfSources << 1u;
*pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries); *pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries);
if ((*pTree) == NULL) { if ((*pTree) == NULL) {
@ -50,7 +50,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa
(*pTree)->pNode = (SLoserTreeNode*)(((char*)(*pTree)) + sizeof(SLoserTreeInfo)); (*pTree)->pNode = (SLoserTreeNode*)(((char*)(*pTree)) + sizeof(SLoserTreeInfo));
(*pTree)->numOfEntries = numOfEntries; (*pTree)->numOfEntries = numOfSources;
(*pTree)->totalEntries = totalEntries; (*pTree)->totalEntries = totalEntries;
(*pTree)->param = param; (*pTree)->param = param;
(*pTree)->comparFn = compareFn; (*pTree)->comparFn = compareFn;
@ -63,7 +63,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa
tLoserTreeDisplay(*pTree); tLoserTreeDisplay(*pTree);
#endif #endif
for (int32_t i = totalEntries - 1; i >= numOfEntries; i--) { for (int32_t i = totalEntries - 1; i >= numOfSources; i--) {
tLoserTreeAdjust(*pTree, i); tLoserTreeAdjust(*pTree, i);
} }

View File

@ -7,10 +7,58 @@
#define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES) #define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { typedef struct SFreeListItem {
*pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf)); int32_t offset;
int32_t len;
} SFreeListItem;
SDiskbasedResultBuf* pResBuf = *pResultBuf; typedef struct SPageDiskInfo {
int32_t offset;
int32_t length;
} SPageDiskInfo;
typedef struct SPageInfo {
SListNode* pn; // point to list node
void* pData;
int32_t pageId;
SPageDiskInfo info;
bool used; // set current page is in used
} SPageInfo;
typedef struct SDiskbasedBufStatis {
int32_t flushBytes;
int32_t loadBytes;
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
} SDiskbasedBufStatis;
typedef struct SDiskbasedBuf {
int32_t numOfPages;
int64_t totalBufSize;
int64_t fileSize; // disk file size
FILE* file;
int32_t allocateId; // allocated page id
char* path; // file path
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* groupSet; // id hash table
SHashObj* all;
SList* lruList;
void* emptyDummyIdList; // dummy id list
void* assistBuf; // assistant buffer for compress/decompress data
SArray* pFree; // free area in file
bool comp; // compressed before flushed to disk
int32_t nextPos; // next page flush position
uint64_t qId; // for debug purpose
SDiskbasedBufStatis statis;
} SDiskbasedBuf;
int32_t createDiskbasedResultBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) {
*pResultBuf = calloc(1, sizeof(SDiskbasedBuf));
SDiskbasedBuf* pResBuf = *pResultBuf;
if (pResBuf == NULL) { if (pResBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -47,7 +95,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t createDiskFile(SDiskbasedResultBuf* pResultBuf) { static int32_t createDiskFile(SDiskbasedBuf* pResultBuf) {
pResultBuf->file = fopen(pResultBuf->path, "wb+"); pResultBuf->file = fopen(pResultBuf->path, "wb+");
if (pResultBuf->file == NULL) { if (pResultBuf->file == NULL) {
// qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno)); // qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
@ -57,7 +105,7 @@ static int32_t createDiskFile(SDiskbasedResultBuf* pResultBuf) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedResultBuf* pResultBuf) { // do nothing static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pResultBuf) { // do nothing
if (!pResultBuf->comp) { if (!pResultBuf->comp) {
*dst = srcSize; *dst = srcSize;
return data; return data;
@ -69,7 +117,7 @@ static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbase
return data; return data;
} }
static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedResultBuf* pResultBuf) { // do nothing static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pResultBuf) { // do nothing
if (!pResultBuf->comp) { if (!pResultBuf->comp) {
*dst = srcSize; *dst = srcSize;
return data; return data;
@ -82,7 +130,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskba
return data; return data;
} }
static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t size) { static int32_t allocatePositionInFile(SDiskbasedBuf* pResultBuf, size_t size) {
if (pResultBuf->pFree == NULL) { if (pResultBuf->pFree == NULL) {
return pResultBuf->nextPos; return pResultBuf->nextPos;
} else { } else {
@ -105,7 +153,7 @@ static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t si
} }
} }
static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) {
assert(!pg->used && pg->pData != NULL); assert(!pg->used && pg->pData != NULL);
int32_t size = -1; int32_t size = -1;
@ -163,7 +211,7 @@ static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
return ret; return ret;
} }
static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { static char* flushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
assert(((int64_t) pResultBuf->numOfPages * pResultBuf->pageSize) == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages); assert(((int64_t) pResultBuf->numOfPages * pResultBuf->pageSize) == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages);
@ -178,7 +226,7 @@ static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
} }
// load file block data in disk // load file block data in disk
static char* loadPageFromDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { static char* loadPageFromDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) {
int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET); int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET);
ret = (int32_t)fread(GET_DATA_PAYLOAD(pg), 1, pg->info.length, pResultBuf->file); ret = (int32_t)fread(GET_DATA_PAYLOAD(pg), 1, pg->info.length, pResultBuf->file);
if (ret != pg->info.length) { if (ret != pg->info.length) {
@ -194,7 +242,7 @@ static char* loadPageFromDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
return (char*)GET_DATA_PAYLOAD(pg); return (char*)GET_DATA_PAYLOAD(pg);
} }
static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { static SIDList addNewGroup(SDiskbasedBuf* pResultBuf, int32_t groupId) {
assert(taosHashGet(pResultBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL); assert(taosHashGet(pResultBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL);
SArray* pa = taosArrayInit(1, POINTER_BYTES); SArray* pa = taosArrayInit(1, POINTER_BYTES);
@ -204,7 +252,7 @@ static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
return pa; return pa;
} }
static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) { static SPageInfo* registerPage(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t pageId) {
SIDList list = NULL; SIDList list = NULL;
char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
@ -227,7 +275,7 @@ static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId,
return *(SPageInfo**) taosArrayPush(list, &ppi); return *(SPageInfo**) taosArrayPush(list, &ppi);
} }
static SListNode* getEldestUnrefedPage(SDiskbasedResultBuf* pResultBuf) { static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pResultBuf) {
SListIter iter = {0}; SListIter iter = {0};
tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_BACKWARD); tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_BACKWARD);
@ -246,7 +294,7 @@ static SListNode* getEldestUnrefedPage(SDiskbasedResultBuf* pResultBuf) {
return pn; return pn;
} }
static char* evicOneDataPage(SDiskbasedResultBuf* pResultBuf) { static char* evicOneDataPage(SDiskbasedBuf* pResultBuf) {
char* bufPage = NULL; char* bufPage = NULL;
SListNode* pn = getEldestUnrefedPage(pResultBuf); SListNode* pn = getEldestUnrefedPage(pResultBuf);
@ -290,7 +338,7 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) {
return pageSize + POINTER_BYTES + 2 + sizeof(SFilePage); return pageSize + POINTER_BYTES + 2 + sizeof(SFilePage);
} }
SFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
pResultBuf->statis.getPages += 1; pResultBuf->statis.getPages += 1;
char* availablePage = NULL; char* availablePage = NULL;
@ -327,7 +375,7 @@ SFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
return (void *)(GET_DATA_PAYLOAD(pi)); return (void *)(GET_DATA_PAYLOAD(pi));
} }
SFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) {
assert(pResultBuf != NULL && id >= 0); assert(pResultBuf != NULL && id >= 0);
pResultBuf->statis.getPages += 1; pResultBuf->statis.getPages += 1;
@ -373,7 +421,7 @@ SFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
} }
} }
void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) { void releaseResBufPage(SDiskbasedBuf* pResultBuf, void* page) {
assert(pResultBuf != NULL && page != NULL); assert(pResultBuf != NULL && page != NULL);
char* p = (char*) page - POINTER_BYTES; char* p = (char*) page - POINTER_BYTES;
@ -381,18 +429,18 @@ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) {
releaseResBufPageInfo(pResultBuf, ppi); releaseResBufPageInfo(pResultBuf, ppi);
} }
void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi) { void releaseResBufPageInfo(SDiskbasedBuf* pResultBuf, SPageInfo* pi) {
assert(pi->pData != NULL && pi->used); assert(pi->pData != NULL && pi->used);
pi->used = false; pi->used = false;
pResultBuf->statis.releasePages += 1; pResultBuf->statis.releasePages += 1;
} }
size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); } size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); }
size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; } size_t getResBufSize(const SDiskbasedBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; }
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId) {
assert(pResultBuf != NULL); assert(pResultBuf != NULL);
char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
@ -403,7 +451,7 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId)
} }
} }
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) { void destroyResultBuf(SDiskbasedBuf* pResultBuf) {
if (pResultBuf == NULL) { if (pResultBuf == NULL) {
return; return;
} }
@ -444,8 +492,23 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
tfree(pResultBuf); tfree(pResultBuf);
} }
SPageInfo* getLastPageInfo(SIDList pList) { struct SPageInfo* getLastPageInfo(SIDList pList) {
size_t size = taosArrayGetSize(pList); size_t size = taosArrayGetSize(pList);
return (SPageInfo*) taosArrayGetP(pList, size - 1); SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
return pPgInfo;
} }
int32_t getPageId(const struct SPageInfo* pPgInfo) {
ASSERT(pPgInfo != NULL);
return pPgInfo->pageId;
}
int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf) {
return pResultBuf->pageSize;
}
bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf) {
return pResultBuf->fileSize == 0;
}