[td-11818]refactor sort operator.
This commit is contained in:
parent
43b0b23cd3
commit
ab45cd6611
|
@ -622,7 +622,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
|
|||
|
||||
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
|
||||
|
||||
void* destroyOutputBuf(SSDataBlock* pBlock);
|
||||
void* destroySDataBlock(SSDataBlock* pBlock);
|
||||
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
|
||||
|
||||
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
||||
|
|
|
@ -336,7 +336,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
|
|||
return res;
|
||||
}
|
||||
|
||||
void* destroyOutputBuf(SSDataBlock* pBlock) {
|
||||
void* destroySDataBlock(SSDataBlock* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -5373,7 +5373,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
|
||||
taosArrayDestroy(pInfo->orderColumnList);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = destroySDataBlock(pInfo->pRes);
|
||||
tfree(pInfo->prevRow);
|
||||
}
|
||||
|
||||
|
@ -6566,7 +6566,7 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
|
|||
tfree(pInfo->rowCellInfoOffset);
|
||||
|
||||
cleanupResultRowInfo(&pInfo->resultRowInfo);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = destroySDataBlock(pInfo->pRes);
|
||||
}
|
||||
|
||||
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
|
@ -6590,7 +6590,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
|
||||
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = destroySDataBlock(pInfo->pRes);
|
||||
tfree(pInfo->p);
|
||||
}
|
||||
|
||||
|
@ -6607,12 +6607,12 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
|
||||
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
STagScanInfo* pInfo = (STagScanInfo*) param;
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = destroySDataBlock(pInfo->pRes);
|
||||
}
|
||||
|
||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
|
||||
pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock);
|
||||
pInfo->pDataBlock = destroySDataBlock(pInfo->pDataBlock);
|
||||
}
|
||||
|
||||
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
|
@ -6625,7 +6625,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosHashCleanup(pInfo->pSet);
|
||||
tfree(pInfo->buf);
|
||||
taosArrayDestroy(pInfo->pDistinctDataInfo);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = destroySDataBlock(pInfo->pRes);
|
||||
}
|
||||
|
||||
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
|
|
|
@ -16,31 +16,19 @@
|
|||
#ifndef TDENGINE_COMMON_H
|
||||
#define TDENGINE_COMMON_H
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "taosdef.h"
|
||||
#include "tmsg.h"
|
||||
#include "tarray.h"
|
||||
#include "tvariant.h"
|
||||
//typedef struct STimeWindow {
|
||||
// TSKEY skey;
|
||||
// TSKEY ekey;
|
||||
//} STimeWindow;
|
||||
|
||||
//typedef struct {
|
||||
// int32_t dataLen;
|
||||
// char name[TSDB_TABLE_FNAME_LEN];
|
||||
// char *data;
|
||||
//} STagData;
|
||||
|
||||
//typedef struct SSchema {
|
||||
// uint8_t type;
|
||||
// char name[TSDB_COL_NAME_LEN];
|
||||
// int16_t colId;
|
||||
// int16_t bytes;
|
||||
//} SSchema;
|
||||
|
||||
typedef struct {
|
||||
uint32_t numOfTables;
|
||||
SArray *pGroupList;
|
||||
SArray * pGroupList;
|
||||
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
|
||||
} STableGroupInfo;
|
||||
|
||||
|
@ -63,7 +51,7 @@ typedef struct SDataBlockInfo {
|
|||
|
||||
typedef struct SConstantItem {
|
||||
SColumnInfo info;
|
||||
int32_t startRow; // run-length-encoding to save the space for multiple rows
|
||||
int32_t startRow; // run-length-encoding to save the space for multiple rows
|
||||
int32_t endRow;
|
||||
SVariant value;
|
||||
} SConstantItem;
|
||||
|
@ -71,58 +59,67 @@ typedef struct SConstantItem {
|
|||
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
|
||||
typedef struct SSDataBlock {
|
||||
SColumnDataAgg *pBlockAgg;
|
||||
SArray *pDataBlock; // SArray<SColumnInfoData>
|
||||
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
|
||||
SDataBlockInfo info;
|
||||
SArray * pDataBlock; // SArray<SColumnInfoData>
|
||||
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
|
||||
SDataBlockInfo info;
|
||||
} SSDataBlock;
|
||||
|
||||
typedef struct SVarColAttr {
|
||||
int32_t *offset; // start position for each entry in the list
|
||||
uint32_t length; // used buffer size that contain the valid data
|
||||
uint32_t allocLen; // allocated buffer size
|
||||
} SVarColAttr;
|
||||
|
||||
// pBlockAgg->numOfNull == info.rows, all data are null
|
||||
// pBlockAgg->numOfNull == 0, no data are null.
|
||||
typedef struct SColumnInfoData {
|
||||
SColumnInfo info; // TODO filter info needs to be removed
|
||||
char *nullbitmap;//
|
||||
char *pData; // the corresponding block data in memory
|
||||
SColumnInfo info; // TODO filter info needs to be removed
|
||||
char *pData; // the corresponding block data in memory
|
||||
union {
|
||||
char *nullbitmap; // bitmap, one bit for each item in the list
|
||||
SVarColAttr varmeta;
|
||||
};
|
||||
} SColumnInfoData;
|
||||
|
||||
//======================================================================================================================
|
||||
// the following structure shared by parser and executor
|
||||
typedef struct SColumn {
|
||||
uint64_t uid;
|
||||
char name[TSDB_COL_NAME_LEN];
|
||||
int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
|
||||
SColumnInfo info;
|
||||
uint64_t uid;
|
||||
char name[TSDB_COL_NAME_LEN];
|
||||
int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string)
|
||||
SColumnInfo info;
|
||||
} SColumn;
|
||||
|
||||
typedef struct SLimit {
|
||||
int64_t limit;
|
||||
int64_t offset;
|
||||
int64_t limit;
|
||||
int64_t offset;
|
||||
} SLimit;
|
||||
|
||||
typedef struct SOrder {
|
||||
uint32_t order;
|
||||
SColumn col;
|
||||
uint32_t order;
|
||||
SColumn col;
|
||||
} SOrder;
|
||||
|
||||
typedef struct SGroupbyExpr {
|
||||
SArray* columnInfo; // SArray<SColIndex>, group by columns information
|
||||
bool groupbyTag; // group by tag or column
|
||||
SArray *columnInfo; // SArray<SColIndex>, group by columns information
|
||||
bool groupbyTag; // group by tag or column
|
||||
} SGroupbyExpr;
|
||||
|
||||
// the structure for sql function in select clause
|
||||
typedef struct SSqlExpr {
|
||||
char token[TSDB_COL_NAME_LEN]; // original token
|
||||
SSchema resSchema;
|
||||
char token[TSDB_COL_NAME_LEN]; // original token
|
||||
SSchema resSchema;
|
||||
|
||||
int32_t numOfCols;
|
||||
SColumn* pColumns; // data columns that are required by query
|
||||
int32_t interBytes; // inter result buffer size
|
||||
int16_t numOfParams; // argument value of each function
|
||||
SVariant param[3]; // parameters are not more than 3
|
||||
int32_t numOfCols;
|
||||
SColumn *pColumns; // data columns that are required by query
|
||||
int32_t interBytes; // inter result buffer size
|
||||
int16_t numOfParams; // argument value of each function
|
||||
SVariant param[3]; // parameters are not more than 3
|
||||
} SSqlExpr;
|
||||
|
||||
typedef struct SExprInfo {
|
||||
struct SSqlExpr base;
|
||||
struct tExprNode *pExpr;
|
||||
struct SSqlExpr base;
|
||||
struct tExprNode *pExpr;
|
||||
} SExprInfo;
|
||||
|
||||
typedef struct SStateWindow {
|
||||
|
@ -130,13 +127,19 @@ typedef struct SStateWindow {
|
|||
} SStateWindow;
|
||||
|
||||
typedef struct SSessionWindow {
|
||||
int64_t gap; // gap between two session window(in microseconds)
|
||||
int64_t gap; // gap between two session window(in microseconds)
|
||||
SColumn col;
|
||||
} SSessionWindow;
|
||||
|
||||
#define QUERY_ASC_FORWARD_STEP 1
|
||||
#define QUERY_ASC_FORWARD_STEP 1
|
||||
#define QUERY_DESC_FORWARD_STEP -1
|
||||
|
||||
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
|
||||
|
||||
void *destroySDataBlock(SSDataBlock *pBlock);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // TDENGINE_COMMON_H
|
||||
|
|
|
@ -7,12 +7,18 @@ extern "C" {
|
|||
|
||||
#include "os.h"
|
||||
#include "tmsg.h"
|
||||
#include "common.h"
|
||||
|
||||
typedef struct SCorEpSet {
|
||||
int32_t version;
|
||||
SEpSet epSet;
|
||||
} SCorEpSet;
|
||||
|
||||
typedef struct SBlockOrderInfo {
|
||||
int32_t order;
|
||||
int32_t colIndex;
|
||||
} SBlockOrderInfo;
|
||||
|
||||
int taosGetFqdnPortFromEp(const char *ep, SEp *pEp);
|
||||
void addEpIntoEpSet(SEpSet *pEpSet, const char *fqdn, uint16_t port);
|
||||
|
||||
|
@ -21,6 +27,30 @@ bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2);
|
|||
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet);
|
||||
SEpSet getEpSet_s(SCorEpSet *pEpSet);
|
||||
|
||||
bool colDataIsNull_f(const char* bitmap, uint32_t row);
|
||||
void colDataSetNull_f(char* bitmap, uint32_t row);
|
||||
|
||||
bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg);
|
||||
|
||||
char* colDataGet(SColumnInfoData* pColumnInfoData, uint32_t row);
|
||||
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
|
||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2);
|
||||
int32_t colDataUpdateTsWindow(SSDataBlock* pDataBlock);
|
||||
|
||||
int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
|
||||
void colDataTrim(SColumnInfoData* pColumnInfoData);
|
||||
|
||||
size_t colDataGetNumOfCols(const SSDataBlock* pBlock);
|
||||
size_t colDataGetNumOfRows(const SSDataBlock* pBlock);
|
||||
|
||||
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
|
||||
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize);
|
||||
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
|
||||
|
||||
size_t blockDataGetSize(const SSDataBlock* pBlock);
|
||||
size_t blockDataGetRowSize(const SSDataBlock* pBlock);
|
||||
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -71,57 +71,215 @@ bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, u
|
|||
}
|
||||
}
|
||||
|
||||
if (pColumnInfoData->nullbitmap == NULL) {
|
||||
return false;
|
||||
}
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
return pColumnInfoData->varmeta.offset[row] == -1;
|
||||
} else {
|
||||
if (pColumnInfoData->nullbitmap == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
uint8_t v = (pColumnInfoData->nullbitmap[row>>3] & (1<<(8 - (row&0x07))));
|
||||
return (v == 1);
|
||||
return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
|
||||
}
|
||||
}
|
||||
|
||||
#define NBIT (3u)
|
||||
#define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT)
|
||||
#define BitPos(_n) ((_n) & ((1<<NBIT) - 1))
|
||||
|
||||
bool colDataIsNull_f(const char* bitmap, uint32_t row) {
|
||||
return (bitmap[row>>3] & (1<<(8 - (row&0x07))));
|
||||
return (bitmap[row>>3u] & (1u<<(7u - BitPos(row)))) == (1u<<(7u - BitPos(row)));
|
||||
}
|
||||
|
||||
void colDataSetNull_f(char* bitmap, uint32_t row) { // TODO
|
||||
return;
|
||||
void colDataSetNull_f(char* bitmap, uint32_t row) {
|
||||
bitmap[row>>3u] |= (1u << (7u - BitPos(row)));
|
||||
}
|
||||
|
||||
void* colDataGet(const SColumnInfoData* pColumnInfoData, uint32_t row) {
|
||||
char* colDataGet(SColumnInfoData* pColumnInfoData, uint32_t row) {
|
||||
char* p = pColumnInfoData->pData;
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
uint32_t offset = ((uint32_t*)pColumnInfoData->pData)[row];
|
||||
return (char*)(pColumnInfoData->pData) + offset; // the first part is the pointer to the true binary data
|
||||
return p + pColumnInfoData->varmeta.offset[row];
|
||||
} else {
|
||||
return (char*)(pColumnInfoData->pData) + (row * pColumnInfoData->info.bytes);
|
||||
return p + (row * pColumnInfoData->info.bytes);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t ensureBitmapSize(SColumnInfoData* pColumnInfoData, uint32_t size) {
|
||||
#if 0
|
||||
ASSERT(pColumnInfoData != NULL);
|
||||
if (pColumnInfoData->bitmapLen * 8 < size) {
|
||||
int32_t inc = pColumnInfoData->bitmapLen * 1.25;
|
||||
if (inc < 8) {
|
||||
inc = 8;
|
||||
}
|
||||
|
||||
char* tmp = realloc(pColumnInfoData->nullbitmap, inc + pColumnInfoData->bitmapLen);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pColumnInfoData->nullbitmap = tmp;
|
||||
memset(pColumnInfoData->nullbitmap + pColumnInfoData->bitmapLen, 0, inc);
|
||||
}
|
||||
#endif
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
|
||||
ASSERT(pColumnInfoData != NULL);
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
return pColumnInfoData->varmeta.length;
|
||||
} else {
|
||||
return pColumnInfoData->info.bytes * numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
void colDataTrim(SColumnInfoData* pColumnInfoData) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull) {
|
||||
ASSERT(pColumnInfoData != NULL);
|
||||
|
||||
if (isNull) {
|
||||
// TODO set null value in the nullbitmap
|
||||
// There is a placehold for each NULL value of binary or nchar type.
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
pColumnInfoData->varmeta.offset[currentRow] = -1; // it is a null value of VAR type.
|
||||
} else {
|
||||
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t type = pColumnInfoData->info.type;
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
// TODO continue append var_type
|
||||
SVarColAttr* pAttr = &pColumnInfoData->varmeta;
|
||||
if (pAttr->allocLen < pAttr->length + varDataTLen(pData)) {
|
||||
uint32_t newSize = pAttr->allocLen;
|
||||
if (newSize == 0) {
|
||||
newSize = 8;
|
||||
}
|
||||
|
||||
while(newSize < pAttr->length + varDataTLen(pData)) {
|
||||
newSize = newSize * 1.5;
|
||||
}
|
||||
|
||||
char* buf = realloc(pColumnInfoData->pData, newSize);
|
||||
if (buf == NULL) {
|
||||
// TODO handle the malloc failure.
|
||||
}
|
||||
|
||||
pColumnInfoData->pData = buf;
|
||||
pAttr->allocLen = newSize;
|
||||
}
|
||||
|
||||
uint32_t len = pColumnInfoData->varmeta.length;
|
||||
pColumnInfoData->varmeta.offset[currentRow] = len;
|
||||
|
||||
memcpy(pColumnInfoData->pData + len, pData, varDataTLen(pData));
|
||||
pColumnInfoData->varmeta.length += varDataTLen(pData);
|
||||
} else {
|
||||
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
|
||||
switch(type) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_UTINYINT: {*(int8_t*) p = *(int8_t*) pData;break;}
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_USMALLINT: {*(int16_t*) p = *(int16_t*) pData;break;}
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_UINT: {*(int32_t*) p = *(int32_t*) pData;break;}
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_UBIGINT: {*(int64_t*) p = *(int64_t*) pData;break;}
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t colDataGetCols(const SSDataBlock* pBlock) {
|
||||
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource, int32_t numOfRow2) {
|
||||
uint32_t total = numOfRow1 + numOfRow2;
|
||||
|
||||
if (BitmapLen(numOfRow1) < BitmapLen(total)) {
|
||||
char* tmp = realloc(pColumnInfoData->nullbitmap, BitmapLen(total));
|
||||
uint32_t extend = BitmapLen(total) - BitmapLen(numOfRow1);
|
||||
memset(tmp + BitmapLen(numOfRow1), 0, extend);
|
||||
pColumnInfoData->nullbitmap = tmp;
|
||||
}
|
||||
|
||||
uint32_t remindBits = BitPos(numOfRow1);
|
||||
uint32_t shiftBits = 8 - remindBits;
|
||||
|
||||
if (remindBits == 0) { // no need to shift bits of bitmap
|
||||
memcpy(pColumnInfoData->nullbitmap + BitmapLen(numOfRow1), pSource->nullbitmap, BitmapLen(numOfRow2));
|
||||
} else {
|
||||
int32_t len = BitmapLen(numOfRow2);
|
||||
int32_t i = 0;
|
||||
|
||||
uint8_t* p = (uint8_t*)pSource->nullbitmap;
|
||||
pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] |= (p[0] >> remindBits);
|
||||
|
||||
uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)];
|
||||
while (i < len) {
|
||||
start[i] |= (p[i] << shiftBits);
|
||||
i += 1;
|
||||
|
||||
if (i > 1) {
|
||||
start[i - 1] |= (p[i] >> remindBits);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2) {
|
||||
ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type);
|
||||
|
||||
if (numOfRow2 == 0) {
|
||||
return numOfRow1;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
// Handle the bitmap
|
||||
char* p = realloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
|
||||
if (p == NULL) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
pColumnInfoData->varmeta.offset = (int32_t*) p;
|
||||
memcpy(pColumnInfoData->varmeta.offset + sizeof(int32_t) * numOfRow1, pSource->varmeta.offset, sizeof(int32_t) * numOfRow2);
|
||||
|
||||
// copy the
|
||||
uint32_t len = pSource->varmeta.length;
|
||||
uint32_t oldLen = pColumnInfoData->varmeta.length;
|
||||
if (pColumnInfoData->varmeta.allocLen < len + oldLen) {
|
||||
char* tmp = realloc(pColumnInfoData->pData, len + oldLen);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pColumnInfoData->pData = tmp;
|
||||
pColumnInfoData->varmeta.allocLen = len + oldLen;
|
||||
}
|
||||
|
||||
memcpy(pColumnInfoData->pData + oldLen, pSource->pData + sizeof(int32_t), len);
|
||||
} else {
|
||||
doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2);
|
||||
|
||||
int32_t newSize = (numOfRow1 + numOfRow2) * pColumnInfoData->info.bytes;
|
||||
char* tmp = realloc(pColumnInfoData->pData, newSize);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pColumnInfoData->pData = tmp;
|
||||
int32_t offset = pColumnInfoData->info.bytes * numOfRow1;
|
||||
memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2);
|
||||
}
|
||||
|
||||
return numOfRow1 + numOfRow2;
|
||||
}
|
||||
|
||||
size_t colDataGetNumOfCols(const SSDataBlock* pBlock) {
|
||||
ASSERT(pBlock);
|
||||
|
||||
size_t constantCols = (pBlock->pConstantList != NULL)? taosArrayGetSize(pBlock->pConstantList):0;
|
||||
|
@ -129,7 +287,7 @@ size_t colDataGetCols(const SSDataBlock* pBlock) {
|
|||
return pBlock->info.numOfCols;
|
||||
}
|
||||
|
||||
size_t colDataGetRows(const SSDataBlock* pBlock) {
|
||||
size_t colDataGetNumOfRows(const SSDataBlock* pBlock) {
|
||||
return pBlock->info.rows;
|
||||
}
|
||||
|
||||
|
@ -153,6 +311,349 @@ int32_t colDataUpdateTsWindow(SSDataBlock* pDataBlock) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
|
||||
assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols);
|
||||
|
||||
int32_t numOfCols = pSrc->info.numOfCols;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
|
||||
|
||||
uint32_t oldLen = colDataGetSize(pCol2, pDest->info.rows);
|
||||
uint32_t newLen = colDataGetSize(pCol1, pSrc->info.rows);
|
||||
|
||||
int32_t newSize = oldLen + newLen;
|
||||
char* tmp = realloc(pCol2->pData, newSize);
|
||||
if (tmp != NULL) {
|
||||
pCol2->pData = tmp;
|
||||
colDataMergeCol(pCol2, pDest->info.rows, pCol1, pSrc->info.rows);
|
||||
} else {
|
||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
pDest->info.rows += pSrc->info.rows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
size_t blockDataGetSize(const SSDataBlock* pBlock) {
|
||||
assert(pBlock != NULL);
|
||||
|
||||
size_t total = 0;
|
||||
int32_t numOfCols = pBlock->info.numOfCols;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
total += colDataGetSize(pColInfoData, pBlock->info.rows);
|
||||
}
|
||||
|
||||
// bitmap for each column
|
||||
total += BitmapLen(pBlock->info.rows) * numOfCols;
|
||||
return total;
|
||||
}
|
||||
|
||||
// the number of tuples can be fit in one page.
|
||||
// Actual data rows pluses the corresponding meta data must fit in one memory buffer of the given page size.
|
||||
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize) {
|
||||
ASSERT(pBlock != NULL && stopIndex != NULL);
|
||||
|
||||
int32_t size = 0;
|
||||
int32_t numOfCols = pBlock->info.numOfCols;
|
||||
int32_t numOfRows = pBlock->info.rows;
|
||||
|
||||
size_t headerSize = sizeof(int32_t);
|
||||
|
||||
// TODO speedup by checking if the whole page can fit in firstly.
|
||||
|
||||
if (!hasVarCol) {
|
||||
size_t rowSize = blockDataGetRowSize(pBlock);
|
||||
int32_t capacity = ((pageSize - headerSize) / (rowSize * 8 + 1)) * 8;
|
||||
*stopIndex = startIndex + capacity;
|
||||
|
||||
if (*stopIndex >= numOfRows) {
|
||||
*stopIndex = numOfRows - 1;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
// iterate the rows that can be fit in this buffer page
|
||||
size += headerSize;
|
||||
|
||||
for(int32_t j = startIndex; j < numOfRows; ++j) {
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
bool isNull = colDataIsNull(pColInfoData, numOfRows, j, NULL);
|
||||
if (isNull) {
|
||||
// do nothing
|
||||
} else {
|
||||
char* p = colDataGet(pColInfoData, j);
|
||||
size += varDataTLen(p);
|
||||
}
|
||||
|
||||
size += sizeof(pColInfoData->varmeta.offset[0]);
|
||||
} else {
|
||||
size += pColInfoData->info.bytes;
|
||||
|
||||
if (((j - startIndex) % 8) == 0) {
|
||||
size += 1; // the space for null bitmap
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (size > pageSize) {
|
||||
*stopIndex = j - 1;
|
||||
ASSERT(*stopIndex > startIndex);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
// all fit in
|
||||
*stopIndex = numOfRows - 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* +---------------------------+---------------------+
|
||||
* |the number of rows(4 bytes)| column #1 |
|
||||
* |---------------------+
|
||||
* | | null bitmap| values |
|
||||
* +---------------------------+---------------------+
|
||||
* @param buf
|
||||
* @param pBlock
|
||||
* @return
|
||||
*/
|
||||
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
|
||||
ASSERT(pBlock != NULL);
|
||||
|
||||
// write the number of rows
|
||||
*(uint32_t*) buf = pBlock->info.rows;
|
||||
|
||||
int32_t numOfCols = pBlock->info.numOfCols;
|
||||
int32_t numOfRows = pBlock->info.rows;
|
||||
|
||||
char* pStart = buf + sizeof(uint32_t);
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t));
|
||||
pStart += numOfRows * sizeof(int32_t);
|
||||
} else {
|
||||
memcpy(pStart, pCol->nullbitmap, BitmapLen(numOfRows));
|
||||
pStart += BitmapLen(pBlock->info.rows);
|
||||
}
|
||||
|
||||
uint32_t dataSize = colDataGetSize(pCol, numOfRows);
|
||||
memcpy(pStart, pCol->pData, dataSize);
|
||||
pStart += dataSize;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t blockDataGetRowSize(const SSDataBlock* pBlock) {
|
||||
ASSERT(pBlock != NULL);
|
||||
size_t rowSize = 0;
|
||||
|
||||
size_t numOfCols = pBlock->info.numOfCols;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
rowSize += pColInfo->info.bytes;
|
||||
}
|
||||
|
||||
return rowSize;
|
||||
}
|
||||
|
||||
typedef struct SSDataBlockSortHelper {
|
||||
SArray *orderInfo; // SArray<SBlockOrderInfo>
|
||||
SSDataBlock *pDataBlock;
|
||||
bool nullFirst;
|
||||
} SSDataBlockSortHelper;
|
||||
|
||||
int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
|
||||
const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*) param;
|
||||
|
||||
SSDataBlock* pDataBlock = pHelper->pDataBlock;
|
||||
|
||||
int32_t* left = (int32_t*) p1;
|
||||
int32_t* right = (int32_t*) p2;
|
||||
|
||||
SArray* pInfo = pHelper->orderInfo;
|
||||
size_t num = taosArrayGetSize(pInfo);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
SBlockOrderInfo* pOrder = taosArrayGet(pInfo, i);
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pOrder->colIndex);
|
||||
|
||||
bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, *left, pDataBlock->pBlockAgg);
|
||||
bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, *right, pDataBlock->pBlockAgg);
|
||||
if (leftNull && rightNull) {
|
||||
continue; // continue to next slot
|
||||
}
|
||||
|
||||
if (rightNull) {
|
||||
return pHelper->nullFirst? 1:-1;
|
||||
}
|
||||
|
||||
if (leftNull) {
|
||||
return pHelper->nullFirst? -1:1;
|
||||
}
|
||||
|
||||
void* left1 = colDataGet(pColInfoData, *left);
|
||||
void* right1 = colDataGet(pColInfoData, *right);
|
||||
|
||||
switch(pColInfoData->info.type) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
if (*(int32_t*) left1 == *(int32_t*) right1) {
|
||||
continue;// TODO continue
|
||||
} else {
|
||||
if (pOrder->order == TSDB_ORDER_ASC) {
|
||||
return (*(int32_t*) left1 < *(int32_t*) right1)? -1:1;
|
||||
} else {
|
||||
return (*(int32_t*) left1 < *(int32_t*) right1)? 1:-1;
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, int32_t tupleIndex) {
|
||||
int32_t numOfCols = pSrcBlock->info.numOfCols;
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pDst = &pDstCols[i];
|
||||
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||
|
||||
bool isNull = colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, NULL);
|
||||
if (isNull) {
|
||||
colDataAppend(pDst, numOfRows, NULL, true);
|
||||
} else {
|
||||
char* p = colDataGet((SColumnInfoData*)pSrc, tupleIndex);
|
||||
colDataAppend(pDst, numOfRows, p, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, int32_t* index) {
|
||||
for (int32_t i = 0; i < pDataBlock->info.rows; ++i) {
|
||||
doAssignOneTuple(pCols, i, pDataBlock, index[i]);
|
||||
}
|
||||
}
|
||||
|
||||
static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t numOfCols = pDataBlock->info.numOfCols;
|
||||
|
||||
SColumnInfoData* pCols = calloc(numOfCols, sizeof(SColumnInfoData));
|
||||
if (pCols == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
pCols[i].info = pColInfoData->info;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
|
||||
pCols[i].varmeta.offset = calloc(rows, sizeof(int32_t));
|
||||
} else {
|
||||
pCols[i].nullbitmap = calloc(1, BitmapLen(rows));
|
||||
pCols[i].pData = calloc(rows, pCols[i].info.bytes);
|
||||
}
|
||||
}
|
||||
|
||||
return pCols;
|
||||
}
|
||||
|
||||
static int32_t copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
|
||||
int32_t numOfCols = pDataBlock->info.numOfCols;
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
pColInfoData->info = pCols[i].info;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
tfree(pColInfoData->varmeta.offset);
|
||||
pColInfoData->varmeta = pCols[i].varmeta;
|
||||
} else {
|
||||
tfree(pColInfoData->nullbitmap);
|
||||
pColInfoData->nullbitmap = pCols[i].nullbitmap;
|
||||
}
|
||||
|
||||
tfree(pColInfoData->pData);
|
||||
pColInfoData->pData = pCols[i].pData;
|
||||
}
|
||||
|
||||
tfree(pCols);
|
||||
}
|
||||
|
||||
static int32_t* createTupleIndex(size_t rows) {
|
||||
int32_t* index = calloc(rows, sizeof(int32_t));
|
||||
if (index == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < rows; ++i) {
|
||||
index[i] = i;
|
||||
}
|
||||
|
||||
return index;
|
||||
}
|
||||
|
||||
static void destroyTupleIndex(int32_t* index) {
|
||||
tfree(index);
|
||||
}
|
||||
|
||||
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) {
|
||||
ASSERT(pDataBlock != NULL && pOrderInfo != NULL);
|
||||
if (pDataBlock->info.rows <= 1) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// Allocate the additional buffer.
|
||||
uint32_t rows = pDataBlock->info.rows;
|
||||
int32_t* index = createTupleIndex(rows);
|
||||
if (index == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
|
||||
taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
|
||||
|
||||
int32_t numOfCols = pDataBlock->info.numOfCols;
|
||||
SColumnInfoData* pCols = createHelpColInfoData(pDataBlock);
|
||||
if (pCols == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
#if 0
|
||||
SColumnInfoData* px = taosArrayGet(pDataBlock->pDataBlock, 0);
|
||||
for(int32_t i = 0; i < pDataBlock->info.rows; ++i) {
|
||||
printf("%d, %d, %d\n", index[i], ((int32_t*)px->pData)[i], ((int32_t*)px->pData)[index[i]]);
|
||||
}
|
||||
#endif
|
||||
blockDataAssign(pCols, pDataBlock, index);
|
||||
|
||||
#if 0
|
||||
for(int32_t i = 0; i < pDataBlock->info.rows; ++i) {
|
||||
if (colDataIsNull(&pCols[0], rows, i, NULL)) {
|
||||
printf("0\t");
|
||||
} else {
|
||||
printf("%d\t", ((int32_t*)pCols[0].pData)[i]);
|
||||
}
|
||||
}
|
||||
|
||||
printf("end\n");
|
||||
#endif
|
||||
|
||||
copyBackToBlock(pDataBlock, pCols);
|
||||
destroyTupleIndex(index);
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
#include <common.h>
|
||||
#include "os.h"
|
||||
#include "tutil.h"
|
||||
|
||||
|
@ -268,4 +269,27 @@ SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* nam
|
|||
|
||||
tstrncpy(s.name, name, tListLen(s.name));
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
void* destroySDataBlock(SSDataBlock* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t numOfOutput = pBlock->info.numOfCols;
|
||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
tfree(pColInfoData->varmeta.offset);
|
||||
} else {
|
||||
tfree(pColInfoData->nullbitmap);
|
||||
}
|
||||
|
||||
tfree(pColInfoData->pData);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pBlock->pDataBlock);
|
||||
tfree(pBlock->pBlockAgg);
|
||||
tfree(pBlock);
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
#include <common.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include <tep.h>
|
||||
#include <iostream>
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
|
||||
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||
#include "os.h"
|
||||
|
||||
|
@ -96,4 +97,199 @@ TEST(testCase, toInteger_test) {
|
|||
ASSERT_EQ(ret, -1);
|
||||
}
|
||||
|
||||
TEST(testCase, Datablock_test) {
|
||||
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
|
||||
b->info.numOfCols = 2;
|
||||
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
|
||||
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.bytes = 4;
|
||||
infoData.info.type = TSDB_DATA_TYPE_INT;
|
||||
infoData.info.colId = 1;
|
||||
|
||||
infoData.pData = (char*) calloc(40, infoData.info.bytes);
|
||||
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (40/8));
|
||||
taosArrayPush(b->pDataBlock, &infoData);
|
||||
|
||||
SColumnInfoData infoData1 = {0};
|
||||
infoData1.info.bytes = 40;
|
||||
infoData1.info.type = TSDB_DATA_TYPE_BINARY;
|
||||
infoData1.info.colId = 2;
|
||||
|
||||
infoData1.varmeta.offset = (int32_t*) calloc(40, sizeof(uint32_t));
|
||||
taosArrayPush(b->pDataBlock, &infoData1);
|
||||
|
||||
char* str = "the value of: %d";
|
||||
char buf[128] = {0};
|
||||
char varbuf[128] = {0};
|
||||
|
||||
for(int32_t i = 0; i < 40; ++i) {
|
||||
SColumnInfoData* p0 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 0);
|
||||
SColumnInfoData* p1 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 1);
|
||||
|
||||
if (i&0x01) {
|
||||
int32_t len = sprintf(buf, str, i);
|
||||
STR_TO_VARSTR(varbuf, buf)
|
||||
colDataAppend(p0, i, (const char*) &i, false);
|
||||
colDataAppend(p1, i, (const char*) varbuf, false);
|
||||
|
||||
memset(varbuf, 0, sizeof(varbuf));
|
||||
memset(buf, 0, sizeof(buf));
|
||||
} else {
|
||||
colDataAppend(p0, i, (const char*) &i, true);
|
||||
colDataAppend(p1, i, (const char*) varbuf, true);
|
||||
}
|
||||
|
||||
b->info.rows++;
|
||||
}
|
||||
|
||||
SColumnInfoData* p0 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 0);
|
||||
SColumnInfoData* p1 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 1);
|
||||
for(int32_t i = 0; i < 40; ++i) {
|
||||
if (i & 0x01) {
|
||||
ASSERT_EQ(colDataIsNull_f(p0->nullbitmap, i), false);
|
||||
ASSERT_EQ(colDataIsNull(p1, b->info.rows, i, nullptr), false);
|
||||
} else {
|
||||
ASSERT_EQ(colDataIsNull_f(p0->nullbitmap, i), true);
|
||||
|
||||
ASSERT_EQ(colDataIsNull(p0, b->info.rows, i, nullptr), true);
|
||||
ASSERT_EQ(colDataIsNull(p1, b->info.rows, i, nullptr), true);
|
||||
}
|
||||
}
|
||||
|
||||
printf("binary column length:%d\n", *(int32_t*) p1->pData);
|
||||
|
||||
ASSERT_EQ(colDataGetNumOfCols(b), 2);
|
||||
ASSERT_EQ(colDataGetNumOfRows(b), 40);
|
||||
|
||||
char* pData = colDataGet(p1, 3);
|
||||
printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData));
|
||||
|
||||
SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo));
|
||||
SBlockOrderInfo order = {.order = TSDB_ORDER_ASC, .colIndex = 0};
|
||||
taosArrayPush(pOrderInfo, &order);
|
||||
|
||||
blockDataSort(b, pOrderInfo, true);
|
||||
destroySDataBlock(b);
|
||||
|
||||
taosArrayDestroy(pOrderInfo);
|
||||
}
|
||||
|
||||
#if 0
|
||||
TEST(testCase, non_var_dataBlock_split_test) {
|
||||
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
|
||||
b->info.numOfCols = 2;
|
||||
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
|
||||
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.bytes = 4;
|
||||
infoData.info.type = TSDB_DATA_TYPE_INT;
|
||||
infoData.info.colId = 1;
|
||||
|
||||
int32_t numOfRows = 1000000;
|
||||
|
||||
infoData.pData = (char*) calloc(numOfRows, infoData.info.bytes);
|
||||
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
|
||||
taosArrayPush(b->pDataBlock, &infoData);
|
||||
|
||||
SColumnInfoData infoData1 = {0};
|
||||
infoData1.info.bytes = 1;
|
||||
infoData1.info.type = TSDB_DATA_TYPE_TINYINT;
|
||||
infoData1.info.colId = 2;
|
||||
|
||||
infoData1.pData = (char*) calloc(numOfRows, infoData.info.bytes);
|
||||
infoData1.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
|
||||
taosArrayPush(b->pDataBlock, &infoData1);
|
||||
|
||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
||||
SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0);
|
||||
SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1);
|
||||
|
||||
int8_t v = i;
|
||||
colDataAppend(p0, i, (const char*)&i, false);
|
||||
colDataAppend(p1, i, (const char*)&v, false);
|
||||
b->info.rows++;
|
||||
}
|
||||
|
||||
int32_t pageSize = 64 * 1024;
|
||||
|
||||
int32_t startIndex= 0;
|
||||
int32_t stopIndex = 0;
|
||||
int32_t count = 1;
|
||||
while(1) {
|
||||
blockDataSplitRows(b, false, startIndex, &stopIndex, pageSize);
|
||||
printf("the %d split, from: %d to %d\n", count++, startIndex, stopIndex);
|
||||
|
||||
if (stopIndex == numOfRows - 1) {
|
||||
break;
|
||||
}
|
||||
|
||||
startIndex = stopIndex + 1;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
TEST(testCase, var_dataBlock_split_test) {
|
||||
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
|
||||
b->info.numOfCols = 2;
|
||||
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
|
||||
|
||||
int32_t numOfRows = 1000000;
|
||||
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.bytes = 4;
|
||||
infoData.info.type = TSDB_DATA_TYPE_INT;
|
||||
infoData.info.colId = 1;
|
||||
|
||||
infoData.pData = (char*) calloc(numOfRows, infoData.info.bytes);
|
||||
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
|
||||
taosArrayPush(b->pDataBlock, &infoData);
|
||||
|
||||
SColumnInfoData infoData1 = {0};
|
||||
infoData1.info.bytes = 40;
|
||||
infoData1.info.type = TSDB_DATA_TYPE_BINARY;
|
||||
infoData1.info.colId = 2;
|
||||
|
||||
infoData1.varmeta.offset = (int32_t*) calloc(numOfRows, sizeof(uint32_t));
|
||||
taosArrayPush(b->pDataBlock, &infoData1);
|
||||
|
||||
char buf[41] = {0};
|
||||
char buf1[100] = {0};
|
||||
|
||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
||||
SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0);
|
||||
SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1);
|
||||
|
||||
int8_t v = i;
|
||||
colDataAppend(p0, i, (const char*)&i, false);
|
||||
|
||||
sprintf(buf, "the number of row:%d", i);
|
||||
int32_t len = sprintf(buf1, buf, i);
|
||||
STR_TO_VARSTR(buf1, buf)
|
||||
colDataAppend(p1, i, buf1, false);
|
||||
b->info.rows++;
|
||||
|
||||
memset(buf, 0, sizeof(buf));
|
||||
memset(buf1, 0, sizeof(buf1));
|
||||
}
|
||||
|
||||
int32_t pageSize = 64 * 1024;
|
||||
|
||||
int32_t startIndex= 0;
|
||||
int32_t stopIndex = 0;
|
||||
int32_t count = 1;
|
||||
while(1) {
|
||||
blockDataSplitRows(b, true, startIndex, &stopIndex, pageSize);
|
||||
printf("the %d split, from: %d to %d\n", count++, startIndex, stopIndex);
|
||||
|
||||
if (stopIndex == numOfRows - 1) {
|
||||
break;
|
||||
}
|
||||
|
||||
startIndex = stopIndex + 1;
|
||||
}
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
|
@ -3640,13 +3640,13 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
|
|||
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
|
||||
STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
|
||||
if (pTbCfg == NULL) {
|
||||
// tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId);
|
||||
tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId);
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (pTbCfg->type != META_SUPER_TABLE) {
|
||||
// tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId);
|
||||
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId);
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
|
||||
goto _error;
|
||||
}
|
||||
|
@ -3665,8 +3665,8 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
|
|||
pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
|
||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
|
||||
|
||||
// tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb,
|
||||
// pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
|
||||
tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta,
|
||||
pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
|
||||
|
||||
taosArrayDestroy(res);
|
||||
return ret;
|
||||
|
|
|
@ -557,17 +557,15 @@ typedef struct SMultiwayMergeInfo {
|
|||
|
||||
// todo support the disk-based sort
|
||||
typedef struct SOrderOperatorInfo {
|
||||
int32_t colIndex;
|
||||
int32_t order;
|
||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||
SArray *orderInfo; // SArray<SBlockOrderInfo>
|
||||
SSDataBlock *pDataBlock;
|
||||
bool nullFirst; // null value is put in the front
|
||||
} SOrderOperatorInfo;
|
||||
|
||||
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
|
||||
|
@ -591,7 +589,8 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
|
|||
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
|
||||
|
||||
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput);
|
||||
SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal);
|
||||
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SOrder* pOrderVal);
|
||||
SOperatorInfo* createMergeSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal);
|
||||
|
||||
//SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
|
||||
//SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
|
||||
|
@ -604,7 +603,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
|
|||
|
||||
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
|
||||
|
||||
void* destroyOutputBuf(SSDataBlock* pBlock);
|
||||
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
|
||||
|
||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
||||
|
@ -613,7 +611,6 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
|
|||
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
|
||||
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||
|
||||
void freeParam(STaskParam *param);
|
||||
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo,
|
||||
SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo);
|
||||
|
||||
|
|
|
@ -12,12 +12,13 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include "parser.h"
|
||||
#include "tq.h"
|
||||
#include <tep.h>
|
||||
#include "exception.h"
|
||||
#include "os.h"
|
||||
#include "parser.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmsg.h"
|
||||
#include "tq.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#include "executorimpl.h"
|
||||
|
@ -337,23 +338,6 @@ SSDataBlock* createOutputBuf_rv(SArray* pExprInfo, int32_t numOfRows) {
|
|||
return res;
|
||||
}
|
||||
|
||||
void* destroyOutputBuf(SSDataBlock* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t numOfOutput = pBlock->info.numOfCols;
|
||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
tfree(pColInfoData->pData);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pBlock->pDataBlock);
|
||||
tfree(pBlock->pBlockAgg);
|
||||
tfree(pBlock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static bool isSelectivityWithTagsQuery(SqlFunctionCtx *pCtx, int32_t numOfOutput) {
|
||||
return true;
|
||||
// bool hasTags = false;
|
||||
|
@ -2188,174 +2172,6 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT
|
|||
|
||||
// group by normal column, sliding window query, interval query are handled by interval query processor
|
||||
// interval (down sampling operation)
|
||||
int32_t numOfOperator = (int32_t) taosArrayGetSize(pOperator);
|
||||
for(int32_t i = 0; i < numOfOperator; ++i) {
|
||||
int32_t* op = taosArrayGet(pOperator, i);
|
||||
|
||||
switch (*op) {
|
||||
// case OP_TagScan: {
|
||||
// pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
// break;
|
||||
// }
|
||||
// case OP_MultiTableTimeInterval: {
|
||||
// pRuntimeEnv->proot =
|
||||
// createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
|
||||
// break;
|
||||
// }
|
||||
// case OP_AllMultiTableTimeInterval: {
|
||||
// pRuntimeEnv->proot =
|
||||
// createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
|
||||
// break;
|
||||
// }
|
||||
// case OP_TimeWindow: {
|
||||
// pRuntimeEnv->proot =
|
||||
// createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
|
||||
// if (opType != OP_DummyInput && opType != OP_Join) {
|
||||
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
// case OP_AllTimeWindow: {
|
||||
// pRuntimeEnv->proot =
|
||||
// createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
|
||||
// if (opType != OP_DummyInput && opType != OP_Join) {
|
||||
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
// case OP_Groupby: {
|
||||
// pRuntimeEnv->proot =
|
||||
// createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
//
|
||||
// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
|
||||
// if (opType != OP_DummyInput) {
|
||||
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
// case OP_SessionWindow: {
|
||||
// pRuntimeEnv->proot =
|
||||
// createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
|
||||
// if (opType != OP_DummyInput) {
|
||||
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
// case OP_MultiTableAggregate: {
|
||||
// pRuntimeEnv->proot =
|
||||
// createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
|
||||
// break;
|
||||
// }
|
||||
// case OP_Aggregate: {
|
||||
// pRuntimeEnv->proot =
|
||||
// createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
//
|
||||
// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
|
||||
// if (opType != OP_DummyInput && opType != OP_Join) {
|
||||
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case OP_Project: { // TODO refactor to remove arith operator.
|
||||
// SOperatorInfo* prev = pRuntimeEnv->proot;
|
||||
// if (i == 0) {
|
||||
// pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
// if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput && prev->operatorType != OP_Join) { // TODO refactor
|
||||
// setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot);
|
||||
// }
|
||||
// } else {
|
||||
// prev = pRuntimeEnv->proot;
|
||||
// assert(pQueryAttr->pExpr2 != NULL);
|
||||
// pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case OP_StateWindow: {
|
||||
// pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType;
|
||||
// if (opType != OP_DummyInput) {
|
||||
// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case OP_Limit: {
|
||||
// pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case OP_Filter: { // todo refactor
|
||||
// int32_t numOfFilterCols = 0;
|
||||
// if (pQueryAttr->stableQuery) {
|
||||
// SColumnInfo* pColInfo =
|
||||
// extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
|
||||
// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||
// pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
|
||||
// freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
|
||||
// } else {
|
||||
// SColumnInfo* pColInfo =
|
||||
// extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
|
||||
// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
|
||||
// pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
|
||||
// freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
|
||||
// }
|
||||
//
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case OP_Fill: {
|
||||
// SOperatorInfo* pInfo = pRuntimeEnv->proot;
|
||||
// pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult);
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case OP_MultiwayMergeSort: {
|
||||
// pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger);
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock.
|
||||
// bool multigroupResult = pQueryAttr->multigroupResult;
|
||||
// if (pQueryAttr->multigroupResult) {
|
||||
// multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE);
|
||||
// }
|
||||
//
|
||||
// pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||
// pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult);
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case OP_SLimit: {
|
||||
// int32_t num = pRuntimeEnv->proot->numOfOutput;
|
||||
// SExprInfo* pExpr = pRuntimeEnv->proot->pExpr;
|
||||
// pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult);
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case OP_Distinct: {
|
||||
// pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case OP_Order: {
|
||||
// pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order);
|
||||
// break;
|
||||
// }
|
||||
|
||||
default: {
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_clean:
|
||||
|
@ -5071,6 +4887,8 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
|
|||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||
|
||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||
pBlockInfo->rows = 0;
|
||||
|
||||
while (tqNextDataBlock(pInfo->readerHandle)) {
|
||||
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -5605,7 +5423,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
|
||||
taosArrayDestroy(pInfo->orderColumnList);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = destroySDataBlock(pInfo->pRes);
|
||||
tfree(pInfo->prevRow);
|
||||
}
|
||||
|
||||
|
@ -5717,30 +5535,6 @@ SOperatorInfo *createMultiwaySortOperatorInfo(STaskRuntimeEnv *pRuntimeEnv, SExp
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) {
|
||||
assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols);
|
||||
|
||||
int32_t numOfCols = pSrc->info.numOfCols;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
|
||||
|
||||
int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes;
|
||||
char* tmp = realloc(pCol2->pData, newSize);
|
||||
if (tmp != NULL) {
|
||||
pCol2->pData = tmp;
|
||||
int32_t offset = pCol2->info.bytes * pDest->info.rows;
|
||||
memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes);
|
||||
} else {
|
||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
pDest->info.rows += pSrc->info.rows;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* doSort(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -5761,64 +5555,56 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
|
|||
break;
|
||||
}
|
||||
|
||||
int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock);
|
||||
int32_t code = blockDataMerge(pInfo->pDataBlock, pBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo handle error
|
||||
}
|
||||
|
||||
size_t size = blockDataGetSize(pInfo->pDataBlock);
|
||||
if (size > pInfo->sortBufSize) {
|
||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst);
|
||||
|
||||
// flush to disk
|
||||
}
|
||||
}
|
||||
|
||||
int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
|
||||
void** pCols = calloc(numOfCols, POINTER_BYTES);
|
||||
SSchema* pSchema = calloc(numOfCols, sizeof(SSchema));
|
||||
// int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
|
||||
// void** pCols = calloc(numOfCols, POINTER_BYTES);
|
||||
// SSchema* pSchema = calloc(numOfCols, sizeof(SSchema));
|
||||
//
|
||||
// for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
// SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i);
|
||||
// pCols[i] = p1->pData;
|
||||
// pSchema[i].colId = p1->info.colId;
|
||||
// pSchema[i].bytes = p1->info.bytes;
|
||||
// pSchema[i].type = (uint8_t) p1->info.type;
|
||||
// }
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i);
|
||||
pCols[i] = p1->pData;
|
||||
pSchema[i].colId = p1->info.colId;
|
||||
pSchema[i].bytes = p1->info.bytes;
|
||||
pSchema[i].type = (uint8_t) p1->info.type;
|
||||
}
|
||||
// __compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order);
|
||||
// taoscQSort(pCols, pInfo->pDataBlock->info.rows, sizeof(int32_t), pInfo, comp);
|
||||
|
||||
__compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order);
|
||||
// taosqsort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp);
|
||||
|
||||
tfree(pCols);
|
||||
tfree(pSchema);
|
||||
// tfree(pCols);
|
||||
// tfree(pSchema);
|
||||
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal) {
|
||||
SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SOrder* pOrderVal) {
|
||||
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
|
||||
|
||||
{
|
||||
SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock));
|
||||
pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
|
||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SColumnInfoData col = {{0}};
|
||||
col.info.colId = pExpr[i].base.pColumns->info.colId;
|
||||
// col.info.bytes = pExpr[i].base.colBytes;
|
||||
// col.info.type = pExpr[i].base.colType;
|
||||
taosArrayPush(pDataBlock->pDataBlock, &col);
|
||||
|
||||
// if (col.info.colId == pOrderVal->orderColId) {
|
||||
// pInfo->colIndex = i;
|
||||
// }
|
||||
}
|
||||
|
||||
pDataBlock->info.numOfCols = numOfOutput;
|
||||
// pInfo->order = pOrderVal->order;
|
||||
pInfo->pDataBlock = pDataBlock;
|
||||
}
|
||||
pInfo->sortBufSize = 1024 * 1024; // 1MB
|
||||
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, 4096);
|
||||
pInfo->orderInfo = taosArrayInit(1, sizeof(SOrder));
|
||||
taosArrayPush(pInfo->orderInfo, pOrderVal); // todo more than one order column
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
pOperator->name = "InMemoryOrder";
|
||||
// pOperator->operatorType = OP_Order;
|
||||
pOperator->name = "Order";
|
||||
pOperator->operatorType = OP_Order;
|
||||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->exec = doSort;
|
||||
pOperator->cleanupFn = destroyOrderOperatorInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
pOperator->cleanupFn = destroyOrderOperatorInfo;
|
||||
|
||||
appendDownstream(pOperator, downstream);
|
||||
return pOperator;
|
||||
|
@ -6794,7 +6580,7 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
|
|||
tfree(pInfo->rowCellInfoOffset);
|
||||
|
||||
cleanupResultRowInfo(&pInfo->resultRowInfo);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = destroySDataBlock(pInfo->pRes);
|
||||
}
|
||||
|
||||
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
|
@ -6818,7 +6604,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
|
||||
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = destroySDataBlock(pInfo->pRes);
|
||||
tfree(pInfo->p);
|
||||
}
|
||||
|
||||
|
@ -6835,12 +6621,12 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
|
||||
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
STagScanInfo* pInfo = (STagScanInfo*) param;
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = destroySDataBlock(pInfo->pRes);
|
||||
}
|
||||
|
||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
|
||||
pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock);
|
||||
pInfo->pDataBlock = destroySDataBlock(pInfo->pDataBlock);
|
||||
}
|
||||
|
||||
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
|
@ -6853,7 +6639,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosHashCleanup(pInfo->pSet);
|
||||
tfree(pInfo->buf);
|
||||
taosArrayDestroy(pInfo->pDistinctDataInfo);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
pInfo->pRes = destroySDataBlock(pInfo->pRes);
|
||||
}
|
||||
|
||||
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
|
@ -7717,6 +7503,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
|
|||
}
|
||||
|
||||
static tsdbReaderT doCreateDataReader(STableScanPhyNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId);
|
||||
|
||||
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId);
|
||||
|
||||
SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) {
|
||||
|
|
|
@ -216,7 +216,9 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable
|
|||
} else if (needSeqScan(pPlanNode)) {
|
||||
return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
|
||||
}
|
||||
return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan);
|
||||
|
||||
int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan;
|
||||
return createUserTableScanNode(pPlanNode, pTable, type);
|
||||
}
|
||||
|
||||
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
||||
|
|
Loading…
Reference in New Issue