Merge pull request #17908 from taosdata/feature/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
f22029b886
|
@ -137,6 +137,9 @@ static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, ui
|
|||
for (int32_t i = start; i < start + nRows; ++i) {
|
||||
colDataSetNull_f(pColumnInfoData->nullbitmap, i);
|
||||
}
|
||||
|
||||
int32_t bytes = pColumnInfoData->info.bytes;
|
||||
memset(pColumnInfoData->pData + start * bytes, 0, nRows * bytes);
|
||||
}
|
||||
|
||||
pColumnInfoData->hasNull = true;
|
||||
|
@ -215,7 +218,7 @@ size_t blockDataGetSerialMetaSize(uint32_t numOfCols);
|
|||
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
|
||||
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
|
||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||
|
||||
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||
|
|
|
@ -241,13 +241,6 @@ struct STag {
|
|||
memcpy(varDataVal(x), (str), __len); \
|
||||
} while (0);
|
||||
|
||||
#define STR_TO_NET_VARSTR(x, str) \
|
||||
do { \
|
||||
VarDataLenT __len = (VarDataLenT)strlen(str); \
|
||||
*(VarDataLenT *)(x) = htons(__len); \
|
||||
memcpy(varDataVal(x), (str), __len); \
|
||||
} while (0);
|
||||
|
||||
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \
|
||||
do { \
|
||||
char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \
|
||||
|
|
|
@ -182,7 +182,7 @@ struct SScalarParam {
|
|||
};
|
||||
|
||||
void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell);
|
||||
int32_t getNumOfResult(SqlFunctionCtx *pCtx, int32_t num, SSDataBlock *pResBlock);
|
||||
//int32_t getNumOfResult(SqlFunctionCtx *pCtx, int32_t num, SSDataBlock *pResBlock);
|
||||
bool isRowEntryCompleted(struct SResultRowEntryInfo *pEntry);
|
||||
bool isRowEntryInitialized(struct SResultRowEntryInfo *pEntry);
|
||||
|
||||
|
|
|
@ -224,7 +224,7 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc
|
|||
|
||||
typedef enum EFuncDataRequired {
|
||||
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
|
||||
FUNC_DATA_REQUIRED_STATIS_LOAD,
|
||||
FUNC_DATA_REQUIRED_SMA_LOAD,
|
||||
FUNC_DATA_REQUIRED_NOT_LOAD,
|
||||
FUNC_DATA_REQUIRED_FILTEROUT,
|
||||
} EFuncDataRequired;
|
||||
|
|
|
@ -246,7 +246,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int
|
|||
uint32_t finalNumOfRows = numOfRow1 + numOfRow2;
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
// Handle the bitmap
|
||||
if (finalNumOfRows > *capacity || (numOfRow1 == 0 && pColumnInfoData->info.bytes != 0)) {
|
||||
if (finalNumOfRows > (*capacity)) {
|
||||
char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -280,16 +280,14 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int
|
|||
memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
|
||||
pColumnInfoData->varmeta.length = len + oldLen;
|
||||
} else {
|
||||
if (finalNumOfRows > *capacity || (numOfRow1 == 0 && pColumnInfoData->info.bytes != 0)) {
|
||||
if (finalNumOfRows > (*capacity)) {
|
||||
// all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0;
|
||||
// ASSERT(finalNumOfRows * pColumnInfoData->info.bytes);
|
||||
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pColumnInfoData->pData = tmp;
|
||||
|
||||
if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) {
|
||||
char* btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows));
|
||||
uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1);
|
||||
|
@ -823,7 +821,7 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
|
|||
}
|
||||
|
||||
static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t rows = pDataBlock->info.capacity;
|
||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
|
||||
SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
|
||||
|
@ -1126,26 +1124,28 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
|
|||
}
|
||||
|
||||
void blockDataCleanup(SSDataBlock* pDataBlock) {
|
||||
pDataBlock->info.rows = 0;
|
||||
pDataBlock->info.groupId = 0;
|
||||
SDataBlockInfo* pInfo = &pDataBlock->info;
|
||||
|
||||
pDataBlock->info.window.ekey = 0;
|
||||
pDataBlock->info.window.skey = 0;
|
||||
pInfo->rows = 0;
|
||||
pInfo->groupId = 0;
|
||||
pInfo->window.ekey = 0;
|
||||
pInfo->window.skey = 0;
|
||||
|
||||
if (pDataBlock->info.capacity == 0) {
|
||||
if (pInfo->capacity == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
colInfoDataCleanup(p, pDataBlock->info.capacity);
|
||||
colInfoDataCleanup(p, pInfo->capacity);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows) {
|
||||
ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows);
|
||||
if (numOfRows < pBlockInfo->capacity) {
|
||||
// todo temporarily disable it
|
||||
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) {
|
||||
ASSERT(numOfRows > 0 /*&& pBlockInfo->capacity >= pBlockInfo->rows*/);
|
||||
if (numOfRows <= pBlockInfo->capacity) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1182,8 +1182,10 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
|
||||
pColumn->pData = tmp;
|
||||
if (clearPayload) {
|
||||
memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1191,6 +1193,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
|
|||
|
||||
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
||||
pColumn->hasNull = false;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
|
||||
pColumn->varmeta.length = 0;
|
||||
if (pColumn->varmeta.offset != NULL) {
|
||||
|
@ -1203,30 +1206,27 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload) {
|
||||
SDataBlockInfo info = {0};
|
||||
return doEnsureCapacity(pColumn, &info, numOfRows);
|
||||
return doEnsureCapacity(pColumn, &info, numOfRows, clearPayload);
|
||||
}
|
||||
|
||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
|
||||
int32_t code = 0;
|
||||
if (numOfRows == 0) {
|
||||
if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pDataBlock->info.capacity < numOfRows) {
|
||||
pDataBlock->info.capacity = numOfRows;
|
||||
}
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
code = doEnsureCapacity(p, &pDataBlock->info, numOfRows);
|
||||
code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, true);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
pDataBlock->info.capacity = numOfRows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1065,8 +1065,8 @@ void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema) {
|
|||
|
||||
void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColVal) {
|
||||
STColumn *pTColumn = &pTSchema->columns[iCol];
|
||||
SCellVal cv;
|
||||
SValue value;
|
||||
SCellVal cv = {0};
|
||||
SValue value = {0};
|
||||
|
||||
ASSERT((pTColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) || (iCol > 0));
|
||||
|
||||
|
|
|
@ -240,25 +240,22 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfCols = pShow->pMeta->numOfColumns;
|
||||
SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
||||
int32_t numOfCols = pShow->pMeta->numOfColumns;
|
||||
|
||||
SSDataBlock *pBlock = createDataBlock();
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData idata = {0};
|
||||
SSchema *p = &pShow->pMeta->pSchemas[i];
|
||||
|
||||
SSchema *p = &pShow->pMeta->pSchemas[i];
|
||||
|
||||
idata.info.bytes = p->bytes;
|
||||
idata.info.type = p->type;
|
||||
idata.info.colId = p->colId;
|
||||
|
||||
taosArrayPush(pBlock->pDataBlock, &idata);
|
||||
if (IS_VAR_DATA_TYPE(p->type)) {
|
||||
pBlock->info.hasVarCol = true;
|
||||
}
|
||||
blockDataAppendColInfo(pBlock, &idata);
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(pBlock, rowsToRead);
|
||||
|
||||
if (mndCheckRetrieveFinished(pShow)) {
|
||||
mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
|
||||
rowsRead = 0;
|
||||
|
|
|
@ -644,6 +644,13 @@ typedef struct SSttBlockLoadInfo {
|
|||
int16_t *colIds;
|
||||
int32_t numOfCols;
|
||||
bool sttBlockLoaded;
|
||||
|
||||
// keep the last access position, this position may be used to reduce the binary times for
|
||||
// starting last block data for a new table
|
||||
struct {
|
||||
int32_t blockIndex;
|
||||
int32_t rowIndex;
|
||||
} prevEndPos;
|
||||
} SSttBlockLoadInfo;
|
||||
|
||||
typedef struct SMergeTree {
|
||||
|
|
|
@ -544,7 +544,6 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
|
|||
taosMemoryFree(pResBlock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
|
|
|
@ -536,10 +536,10 @@ typedef struct SSysTableScanInfo {
|
|||
} SSysTableScanInfo;
|
||||
|
||||
typedef struct SBlockDistInfo {
|
||||
SSDataBlock* pResBlock;
|
||||
STsdbReader* pHandle;
|
||||
SReadHandle readHandle;
|
||||
uint64_t uid; // table uid
|
||||
SSDataBlock* pResBlock;
|
||||
STsdbReader* pHandle;
|
||||
SReadHandle readHandle;
|
||||
uint64_t uid; // table uid
|
||||
} SBlockDistInfo;
|
||||
|
||||
// todo remove this
|
||||
|
@ -550,7 +550,6 @@ typedef struct SOptrBasicInfo {
|
|||
} SOptrBasicInfo;
|
||||
|
||||
typedef struct SIntervalAggOperatorInfo {
|
||||
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
||||
SOptrBasicInfo binfo; // basic info
|
||||
SAggSupporter aggSup; // aggregate supporter
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
|
@ -571,7 +570,6 @@ typedef struct SIntervalAggOperatorInfo {
|
|||
typedef struct SMergeAlignedIntervalAggOperatorInfo {
|
||||
SIntervalAggOperatorInfo* intervalAggOperatorInfo;
|
||||
|
||||
// bool hasGroupId;
|
||||
uint64_t groupId; // current groupId
|
||||
int64_t curTs; // current ts
|
||||
SSDataBlock* prefetchedBlock;
|
||||
|
@ -839,10 +837,6 @@ typedef struct SSortOperatorInfo {
|
|||
SNode* pCondition;
|
||||
} SSortOperatorInfo;
|
||||
|
||||
typedef struct STagFilterOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
} STagFilterOperatorInfo;
|
||||
|
||||
typedef struct SJoinOperatorInfo {
|
||||
SSDataBlock* pRes;
|
||||
int32_t joinType;
|
||||
|
@ -907,7 +901,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
|
|||
|
||||
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
|
||||
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
|
||||
|
||||
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts);
|
||||
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts);
|
||||
|
|
|
@ -170,14 +170,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
|
||||
pRes->info.rows = 1;
|
||||
|
||||
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
||||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows,
|
||||
GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
|
||||
pRes->info.rows, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pRes->info.groupId = getTableGroupId(pTableList, pRes->info.uid);
|
||||
|
|
|
@ -379,7 +379,7 @@ static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarPara
|
|||
pColumnData->info.scale = pType->scale;
|
||||
pColumnData->info.precision = pType->precision;
|
||||
|
||||
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows);
|
||||
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
taosMemoryFree(pColumnData);
|
||||
|
@ -1824,8 +1824,6 @@ static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
|
|||
}
|
||||
|
||||
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
||||
int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
|
||||
|
@ -1847,6 +1845,11 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
|||
|
||||
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
|
||||
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
|
||||
if (pTableListInfo->groupOffset == NULL) {
|
||||
taosArrayDestroy(pList);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
|
||||
taosArrayDestroy(pList);
|
||||
return TDB_CODE_SUCCESS;
|
||||
|
|
|
@ -87,7 +87,7 @@ static void releaseQueryBuf(size_t numOfTables);
|
|||
|
||||
static void destroyFillOperatorInfo(void* param);
|
||||
static void destroyProjectOperatorInfo(void* param);
|
||||
static void destroyOrderOperatorInfo(void* param);
|
||||
static void destroySortOperatorInfo(void* param);
|
||||
static void destroyAggOperatorInfo(void* param);
|
||||
|
||||
static void destroyIntervalOperatorInfo(void* param);
|
||||
|
@ -322,7 +322,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
|
|||
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
pColData->info.bytes = sizeof(int64_t);
|
||||
|
||||
colInfoDataEnsureCapacity(pColData, 5);
|
||||
colInfoDataEnsureCapacity(pColData, 5, false);
|
||||
colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
|
||||
colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);
|
||||
|
||||
|
@ -439,7 +439,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
|
|||
pColInfo = pInput->pData[paramIndex];
|
||||
}
|
||||
|
||||
colInfoDataEnsureCapacity(pColInfo, numOfRows);
|
||||
colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
|
||||
|
||||
int8_t type = pFuncParam->param.nType;
|
||||
if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
|
||||
|
@ -1047,8 +1047,6 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pCol
|
|||
SFilterInfo* filter = pFilterInfo;
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
// pError("start filter");
|
||||
|
||||
// todo move to the initialization function
|
||||
int32_t code = 0;
|
||||
bool needFree = false;
|
||||
|
|
|
@ -336,8 +336,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
return buildGroupResultDataBlock(pOperator);
|
||||
}
|
||||
|
@ -390,7 +388,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
#endif
|
||||
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
|
||||
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
@ -422,6 +419,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
|
|||
}
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -1082,14 +1081,16 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
|||
}
|
||||
pInfo->partitionSup.needCalc = true;
|
||||
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPartNode->part.node.pOutputDataBlockDesc);
|
||||
if (!pResBlock) {
|
||||
pInfo->binfo.pRes = createResDataBlock(pPartNode->part.node.pOutputDataBlockDesc);
|
||||
if (pInfo->binfo.pRes == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
blockDataEnsureCapacity(pResBlock, 4096);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, 4096);
|
||||
|
||||
pInfo->parIte = NULL;
|
||||
pInfo->pInputDataBlock = NULL;
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||
pInfo->tsColIndex = 0;
|
||||
|
|
|
@ -384,6 +384,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
|||
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
|
||||
blockDataEnsureCapacity(pResBlock, numOfRows);
|
||||
|
||||
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -391,8 +392,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
|||
}
|
||||
|
||||
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pCondition = pPhyNode->node.pConditions;
|
||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||
|
||||
|
|
|
@ -58,6 +58,18 @@ typedef struct {
|
|||
void* pVnode;
|
||||
} SSTabFltArg;
|
||||
|
||||
typedef struct STableMergeScanExecInfo {
|
||||
SFileBlockLoadRecorder blockRecorder;
|
||||
SSortExecInfo sortExecInfo;
|
||||
} STableMergeScanExecInfo;
|
||||
|
||||
typedef struct STableMergeScanSortSourceParam {
|
||||
SOperatorInfo* pOperator;
|
||||
int32_t readerIdx;
|
||||
uint64_t uid;
|
||||
SSDataBlock* inputBlock;
|
||||
} STableMergeScanSortSourceParam;
|
||||
|
||||
static int32_t sysChkFilter__Comm(SNode* pNode);
|
||||
static int32_t sysChkFilter__DBName(SNode* pNode);
|
||||
static int32_t sysChkFilter__VgroupId(SNode* pNode);
|
||||
|
@ -69,15 +81,15 @@ static int32_t sysChkFilter__STableName(SNode* pNode);
|
|||
static int32_t sysChkFilter__Uid(SNode* pNode);
|
||||
static int32_t sysChkFilter__Type(SNode* pNode);
|
||||
|
||||
static int32_t sysFilte__DbName(void* pMeta, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__VgroupId(void* pMeta, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__TableName(void* pMeta, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__CreateTime(void* pMeta, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__Ncolumn(void* pMeta, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__Ttl(void* pMeta, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__STableName(void* pMeta, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__Uid(void* pMeta, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__Type(void* pMeta, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__DbName(void* arg, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__VgroupId(void* arg, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__Ttl(void* arg, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__STableName(void* arg, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__Uid(void* arg, SNode* pNode, SArray* result);
|
||||
static int32_t sysFilte__Type(void* arg, SNode* pNode, SArray* result);
|
||||
|
||||
const SSTabFltFuncDef filterDict[] = {
|
||||
{.name = "table_name", .chkFunc = sysChkFilter__TableName, .fltFunc = sysFilte__TableName},
|
||||
|
@ -95,7 +107,7 @@ const SSTabFltFuncDef filterDict[] = {
|
|||
static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result);
|
||||
static int32_t optSysTabFilteImpl(void* arg, SNode* cond, SArray* result);
|
||||
static int32_t optSysCheckOper(SNode* pOpear);
|
||||
static int32_t optSysMergeRslt(SArray* multiRslt, SArray* reslt);
|
||||
static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
|
||||
|
||||
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
||||
|
||||
|
@ -123,29 +135,6 @@ static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
}
|
||||
}
|
||||
|
||||
static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
|
||||
#if 0
|
||||
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
|
||||
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||
SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
|
||||
SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);
|
||||
|
||||
size_t t = taosArrayGetSize(group);
|
||||
for (int32_t j = 0; j < t; ++j) {
|
||||
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
||||
updateTableQueryInfoForReverseScan(pCheckInfo);
|
||||
|
||||
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
|
||||
// the start check timestamp of tsdbQueryHandle
|
||||
// STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
|
||||
// pTableKeyInfo->lastKey = pCheckInfo->lastKey;
|
||||
//
|
||||
// assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
|
||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
|
||||
if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
|
||||
|
@ -291,7 +280,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols,
|
||||
static bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols,
|
||||
int32_t numOfRows) {
|
||||
if (pColsAgg == NULL || pFilterNode == NULL) {
|
||||
return true;
|
||||
|
@ -360,13 +349,13 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo
|
|||
// todo handle the slimit info
|
||||
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
|
||||
SLimit* pLimit = &pLimitInfo->limit;
|
||||
const char* id = GET_TASKID(pTaskInfo);
|
||||
|
||||
if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
|
||||
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
|
||||
pLimitInfo->remainOffset -= pBlock->info.rows;
|
||||
pBlock->info.rows = 0;
|
||||
qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset,
|
||||
GET_TASKID(pTaskInfo));
|
||||
qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
|
||||
} else {
|
||||
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
|
||||
pLimitInfo->remainOffset = 0;
|
||||
|
@ -379,22 +368,11 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
|
|||
int32_t keep = pBlock->info.rows - overflowRows;
|
||||
|
||||
blockDataKeepFirstNRows(pBlock, keep);
|
||||
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
|
||||
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
}
|
||||
|
||||
static void ensureBlockCapacity(SSDataBlock* pBlock, int32_t capacity) {
|
||||
// keep the value of rows temporarily
|
||||
int32_t rows = pBlock->info.rows;
|
||||
|
||||
pBlock->info.rows = 0;
|
||||
blockDataEnsureCapacity(pBlock, capacity);
|
||||
|
||||
// restore the rows number
|
||||
pBlock->info.rows = rows;
|
||||
}
|
||||
|
||||
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||
uint32_t* status) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -425,24 +403,16 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
|||
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
|
||||
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
|
||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||
ensureBlockCapacity(pBlock, pBlock->info.rows);
|
||||
}
|
||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
|
||||
pCost->skipBlocks += 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||
} else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
|
||||
pCost->loadBlockStatis += 1;
|
||||
loadSMA = true; // mark the operation of load sma;
|
||||
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
|
||||
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||
ensureBlockCapacity(pBlock, pBlock->info.rows);
|
||||
}
|
||||
|
||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
|
@ -492,7 +462,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
|||
return terrno;
|
||||
}
|
||||
|
||||
ensureBlockCapacity(pBlock, pBlock->info.rows);
|
||||
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
|
||||
|
||||
|
@ -536,7 +505,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
|
|||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
||||
SSDataBlock* pBlock, int32_t rows, const char* idStr) {
|
||||
// currently only the tbname pseudo column
|
||||
if (numOfPseudoExpr == 0) {
|
||||
if (numOfPseudoExpr <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -566,7 +535,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
|
|||
|
||||
// this is to handle the tbname
|
||||
if (fmIsScanPseudoColumnFunc(functionId)) {
|
||||
setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId);
|
||||
setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name);
|
||||
} else { // these are tags
|
||||
STagVal tagVal = {0};
|
||||
tagVal.cid = pExpr->base.pParam[0].pCol->colId;
|
||||
|
@ -602,16 +571,20 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
|
||||
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
|
||||
struct SScalarFuncExecFuncs fpSet = {0};
|
||||
fmGetScalarFuncExecFuncs(functionId, &fpSet);
|
||||
|
||||
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, sizeof(uint64_t), 1);
|
||||
colInfoDataEnsureCapacity(&infoData, 1);
|
||||
size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
|
||||
char buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(buf, name)
|
||||
|
||||
colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid);
|
||||
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
|
||||
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
|
||||
|
||||
colInfoDataEnsureCapacity(&infoData, 1, false);
|
||||
colDataAppend(&infoData, 0, buf, false);
|
||||
|
||||
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
|
||||
SScalarParam param = {.columnData = pColInfoData};
|
||||
|
||||
if (fpSet.process != NULL) {
|
||||
|
@ -642,9 +615,13 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
blockDataCleanup(pBlock);
|
||||
|
||||
SDataBlockInfo* pBInfo = &pBlock->info;
|
||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBInfo->rows, &pBInfo->uid, &pBInfo->window);
|
||||
|
||||
int32_t rows = 0;
|
||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &rows, &pBInfo->uid, &pBInfo->window);
|
||||
|
||||
blockDataEnsureCapacity(pBlock, rows); // todo remove it latter
|
||||
pBInfo->rows = rows;
|
||||
|
||||
ASSERT(pBInfo->uid != 0);
|
||||
pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid);
|
||||
|
@ -679,7 +656,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
|
@ -692,7 +669,6 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
|||
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||
if (p != NULL) {
|
||||
ASSERT(p->info.uid != 0);
|
||||
return p;
|
||||
}
|
||||
|
||||
|
@ -746,7 +722,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* result = doTableScanGroup(pOperator);
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
if (result) {
|
||||
return result;
|
||||
}
|
||||
|
@ -784,7 +760,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
SSDataBlock* result = doTableScanGroup(pOperator);
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
if (result != NULL) {
|
||||
ASSERT(result->info.uid != 0);
|
||||
return result;
|
||||
|
@ -808,7 +784,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
||||
pInfo->scanTimes = 0;
|
||||
|
||||
result = doTableScanGroup(pOperator);
|
||||
result = doGroupedTableScan(pOperator);
|
||||
if (result != NULL) {
|
||||
return result;
|
||||
}
|
||||
|
@ -851,24 +827,25 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
goto _error;
|
||||
}
|
||||
|
||||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||
SScanPhysiNode* pScanNode = &pTableScanNode->scan;
|
||||
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
|
||||
int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
|
||||
&pInfo->matchInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
||||
initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->limitInfo);
|
||||
code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
||||
if (pScanNode->pScanPseudoCols != NULL) {
|
||||
SExprSupp* pSup = &pInfo->pseudoSup;
|
||||
pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
|
||||
pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
|
||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
|
||||
}
|
||||
|
||||
|
@ -879,9 +856,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pInfo->sample.seed = taosGetTimestampSec();
|
||||
|
||||
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
|
||||
pInfo->pFilterNode = pScanNode->node.pConditions;
|
||||
if (pInfo->pFilterNode != NULL) {
|
||||
code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -988,7 +968,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
|
||||
int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize,
|
||||
int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, (int32_t*)&blockDistInfo.rowSize,
|
||||
GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -1007,12 +987,10 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
|||
tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
|
||||
varDataSetLen(p, len);
|
||||
|
||||
blockDataEnsureCapacity(pBlock, 1);
|
||||
colDataAppend(pColInfo, 0, p, false);
|
||||
taosMemoryFree(p);
|
||||
|
||||
pBlock->info.rows = 1;
|
||||
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return pBlock;
|
||||
}
|
||||
|
@ -1078,7 +1056,9 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
|
|||
|
||||
pInfo->readHandle = *readHandle;
|
||||
pInfo->uid = pBlockScanNode->suid;
|
||||
|
||||
pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
|
||||
blockDataEnsureCapacity(pInfo->pResBlock, 1);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
|
||||
|
@ -2430,7 +2410,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
if (pHandle->initTableReader) {
|
||||
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
||||
pTSInfo->dataReader = NULL;
|
||||
int32_t code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL);
|
||||
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
destroyTableScanOperatorInfo(pTableScanOp);
|
||||
|
@ -4098,12 +4078,14 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
|||
goto _error;
|
||||
}
|
||||
|
||||
SScanPhysiNode* pScanNode = &pScanPhyNode->scan;
|
||||
|
||||
SScanPhysiNode* pScanNode = &pScanPhyNode->scan;
|
||||
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t num = 0;
|
||||
int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->accountId = pScanPhyNode->accountId;
|
||||
pInfo->pUser = taosMemoryStrDup((void*)pUser);
|
||||
|
@ -4140,7 +4122,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
|||
return pOperator;
|
||||
|
||||
_error:
|
||||
taosMemoryFreeClear(pInfo);
|
||||
if (pInfo != NULL) {
|
||||
destroySysScanOperator(pInfo);
|
||||
}
|
||||
taosMemoryFreeClear(pOperator);
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
@ -4244,16 +4228,15 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
|
||||
SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t num = 0;
|
||||
int32_t numOfExprs = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
||||
int32_t code =
|
||||
extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
||||
int32_t num = 0;
|
||||
code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -4285,21 +4268,9 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
|
||||
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) {
|
||||
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
|
||||
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, i);
|
||||
STsdbReader* pReader = NULL;
|
||||
tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr);
|
||||
taosArrayPush(arrayReader, &pReader);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// todo refactor
|
||||
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
||||
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
|
||||
SSDataBlock* pBlock, uint32_t* status) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
|
||||
|
@ -4334,12 +4305,11 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
|||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||
} else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
|
||||
pCost->loadBlockStatis += 1;
|
||||
|
||||
bool allColumnsHaveAgg = true;
|
||||
SColumnDataAgg** pColAgg = NULL;
|
||||
// STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
|
||||
|
||||
if (allColumnsHaveAgg == true) {
|
||||
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
|
@ -4388,13 +4358,12 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
|||
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
||||
|
||||
// currently only the tbname pseudo column
|
||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||
int32_t code =
|
||||
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
|
||||
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||
|
||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
||||
pBlock->info.rows, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
if (pTableScanInfo->pFilterNode != NULL) {
|
||||
|
@ -4416,13 +4385,6 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct STableMergeScanSortSourceParam {
|
||||
SOperatorInfo* pOperator;
|
||||
int32_t readerIdx;
|
||||
uint64_t uid;
|
||||
SSDataBlock* inputBlock;
|
||||
} STableMergeScanSortSourceParam;
|
||||
|
||||
static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||
STableMergeScanSortSourceParam* source = param;
|
||||
SOperatorInfo* pOperator = source->pOperator;
|
||||
|
@ -4433,7 +4395,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
STableMergeScanInfo* pTableScanInfo = pOperator->info;
|
||||
|
||||
SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx);
|
||||
|
||||
blockDataCleanup(pBlock);
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
@ -4443,13 +4404,13 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
|
||||
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo));
|
||||
if (code != 0) {
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
STsdbReader* reader = pInfo->pReader;
|
||||
while (tsdbNextDataBlock(reader)) {
|
||||
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
||||
// process this data block based on the probabilities
|
||||
|
@ -4472,9 +4433,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
}
|
||||
|
||||
uint32_t status = 0;
|
||||
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status);
|
||||
code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
// current block is filter out according to filter condition, continue load the next block
|
||||
|
@ -4482,71 +4443,21 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
continue;
|
||||
}
|
||||
|
||||
pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid);
|
||||
pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid);
|
||||
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows;
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
tsdbReaderClose(pInfo->pReader);
|
||||
pInfo->pReader = NULL;
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
tsdbReaderClose(pInfo->pReader);
|
||||
pInfo->pReader = NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* getTableDataBlock(void* param) {
|
||||
STableMergeScanSortSourceParam* source = param;
|
||||
SOperatorInfo* pOperator = source->pOperator;
|
||||
int32_t readerIdx = source->readerIdx;
|
||||
SSDataBlock* pBlock = source->inputBlock;
|
||||
STableMergeScanInfo* pTableScanInfo = pOperator->info;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
blockDataCleanup(pBlock);
|
||||
|
||||
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
|
||||
while (tsdbNextDataBlock(reader)) {
|
||||
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
||||
// process this data block based on the probabilities
|
||||
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
|
||||
if (!processThisBlock) {
|
||||
continue;
|
||||
}
|
||||
|
||||
blockDataCleanup(pBlock);
|
||||
|
||||
int32_t rows = 0;
|
||||
tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window);
|
||||
blockDataEnsureCapacity(pBlock, rows);
|
||||
pBlock->info.rows = rows;
|
||||
|
||||
uint32_t status = 0;
|
||||
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
|
||||
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
// current block is filter out according to filter condition, continue load the next block
|
||||
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid);
|
||||
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
||||
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
|
||||
int32_t tsTargetSlotId = 0;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
|
||||
|
@ -4675,7 +4586,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
blockDataEnsureCapacity(pResBlock, capacity);
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
|
@ -4781,11 +4691,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
typedef struct STableMergeScanExecInfo {
|
||||
SFileBlockLoadRecorder blockRecorder;
|
||||
SSortExecInfo sortExecInfo;
|
||||
} STableMergeScanExecInfo;
|
||||
|
||||
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||
ASSERT(pOptr != NULL);
|
||||
// TODO: merge these two info into one struct
|
||||
|
@ -4800,18 +4705,6 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
|
||||
const STableKeyInfo* info1 = p1;
|
||||
const STableKeyInfo* info2 = p2;
|
||||
if (info1->groupId < info2->groupId) {
|
||||
return -1;
|
||||
} else if (info1->groupId > info2->groupId) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
|
||||
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
||||
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
|
||||
|
@ -4825,6 +4718,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
int32_t numOfCols = 0;
|
||||
int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
|
||||
&pInfo->matchInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4849,7 +4745,10 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
pInfo->tableListInfo = pTableListInfo;
|
||||
pInfo->scanFlag = MAIN_SCAN;
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
|
||||
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
|
||||
|
||||
pInfo->pSortInfo = generateSortByTsInfo(pInfo->matchInfo.pList, pInfo->cond.order);
|
||||
|
@ -4866,7 +4765,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
pOperator->info = pInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL,
|
||||
destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo);
|
||||
|
|
|
@ -20,7 +20,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator);
|
|||
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
|
||||
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
|
||||
|
||||
static void destroyOrderOperatorInfo(void* param);
|
||||
static void destroySortOperatorInfo(void* param);
|
||||
|
||||
// todo add limit/offset impl
|
||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
|
||||
|
@ -64,7 +64,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
|||
// TODO dynamic set the available sort buffer
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, getExplainExecInfo);
|
||||
createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroySortOperatorInfo, getExplainExecInfo);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -249,7 +249,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
|||
return blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
|
||||
}
|
||||
|
||||
void destroyOrderOperatorInfo(void* param) {
|
||||
void destroySortOperatorInfo(void* param) {
|
||||
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
|
||||
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
||||
|
||||
|
@ -627,10 +627,12 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
|||
|
||||
}
|
||||
|
||||
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
||||
SArray* pColMatchInfo, SOperatorInfo* pOperator) {
|
||||
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo,
|
||||
SOperatorInfo* pOperator) {
|
||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
int32_t capacity = pOperator->resultInfo.capacity;
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
||||
|
@ -640,7 +642,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
}
|
||||
|
||||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
while (1) {
|
||||
doGetSortedBlockData(pInfo, pHandle, capacity, p);
|
||||
if (p->info.rows == 0) {
|
||||
|
@ -656,8 +657,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
}
|
||||
|
||||
if (p->info.rows > 0) {
|
||||
blockDataEnsureCapacity(pDataBlock, p->info.rows);
|
||||
|
||||
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||
|
@ -692,13 +691,13 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
|
|||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes,
|
||||
pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pOperator);
|
||||
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator);
|
||||
if (pBlock != NULL) {
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
} else {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
|
@ -742,12 +741,11 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
|||
|
||||
initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
|
||||
pInfo->binfo.pRes = createResDataBlock(pDescNode);
|
||||
|
||||
int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
|
||||
ASSERT(rowSize < 100 * 1024 * 1024);
|
||||
|
||||
SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
|
||||
int32_t numOfOutputCols = 0;
|
||||
|
||||
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
|
||||
&pInfo->matchInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -756,10 +754,12 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
|||
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pInfo->groupSort = pMergePhyNode->groupSort;
|
||||
pInfo->pSortInfo = pSortInfo;
|
||||
pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
|
||||
pInfo->pInputBlock = pInputBlock;
|
||||
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
|
||||
|
@ -781,11 +781,11 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
|||
return pOperator;
|
||||
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (pInfo != NULL) {
|
||||
destroyMultiwayMergeOperatorInfo(pInfo);
|
||||
}
|
||||
|
||||
pTaskInfo->code = code;
|
||||
taosMemoryFree(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -489,7 +489,7 @@ EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWind
|
|||
if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
|
||||
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||
}
|
||||
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
||||
return FUNC_DATA_REQUIRED_SMA_LOAD;
|
||||
}
|
||||
|
||||
bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
|
@ -1103,7 +1103,7 @@ int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
||||
return FUNC_DATA_REQUIRED_SMA_LOAD;
|
||||
}
|
||||
|
||||
bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
|
|
|
@ -86,8 +86,7 @@ void sifAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *sl
|
|||
SColumnInfoData idata = {0};
|
||||
idata.info = *colInfo;
|
||||
|
||||
colInfoDataEnsureCapacity(&idata, rows);
|
||||
|
||||
colInfoDataEnsureCapacity(&idata, rows, true);
|
||||
taosArrayPush(res->pDataBlock, &idata);
|
||||
|
||||
*dataBlockId = taosArrayGetSize(pBlockList) - 1;
|
||||
|
|
|
@ -279,7 +279,7 @@ static EFuncDataRequired scanPathOptPromoteDataRequired(EFuncDataRequired l, EFu
|
|||
switch (l) {
|
||||
case FUNC_DATA_REQUIRED_DATA_LOAD:
|
||||
return l;
|
||||
case FUNC_DATA_REQUIRED_STATIS_LOAD:
|
||||
case FUNC_DATA_REQUIRED_SMA_LOAD:
|
||||
return FUNC_DATA_REQUIRED_DATA_LOAD == r ? r : l;
|
||||
case FUNC_DATA_REQUIRED_NOT_LOAD:
|
||||
return FUNC_DATA_REQUIRED_FILTEROUT == r ? l : r;
|
||||
|
|
|
@ -14,7 +14,6 @@ target_link_libraries(scalar
|
|||
PRIVATE nodes
|
||||
PRIVATE function
|
||||
PRIVATE qcom
|
||||
PRIVATE vnode
|
||||
)
|
||||
|
||||
if(${BUILD_TEST})
|
||||
|
|
|
@ -49,7 +49,7 @@ int32_t sclCreateColumnInfoData(SDataType *pType, int32_t numOfRows, SScalarPara
|
|||
pColumnData->info.scale = pType->scale;
|
||||
pColumnData->info.precision = pType->precision;
|
||||
|
||||
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows);
|
||||
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pColumnData);
|
||||
|
@ -70,7 +70,7 @@ int32_t sclConvertValueToSclParam(SValueNode* pValueNode, SScalarParam* out, int
|
|||
|
||||
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
|
||||
|
||||
colInfoDataEnsureCapacity(out->columnData, 1);
|
||||
colInfoDataEnsureCapacity(out->columnData, 1, true);
|
||||
code = vectorConvertSingleColImpl(&in, out, overflow, -1, -1);
|
||||
sclFreeParam(&in);
|
||||
|
||||
|
@ -88,7 +88,7 @@ int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockL
|
|||
pLeft->numOfRows = pb->info.rows;
|
||||
|
||||
if (pDst->numOfRows < pb->info.rows) {
|
||||
colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows);
|
||||
colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows, true);
|
||||
}
|
||||
|
||||
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
|
||||
|
@ -1604,7 +1604,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
|
|||
if (1 == res->numOfRows) {
|
||||
SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList));
|
||||
} else {
|
||||
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows);
|
||||
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true);
|
||||
colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
|
||||
pDst->numOfRows = res->numOfRows;
|
||||
pDst->numOfQualified = res->numOfQualified;
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
#include "tdatablock.h"
|
||||
#include "tjson.h"
|
||||
#include "ttime.h"
|
||||
#include "vnode.h"
|
||||
|
||||
typedef float (*_float_fn)(float);
|
||||
typedef double (*_double_fn)(double);
|
||||
|
@ -1718,12 +1717,9 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
|||
|
||||
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
ASSERT(inputNum == 1);
|
||||
char* p = colDataGetVarData(pInput->columnData, 0);
|
||||
|
||||
uint64_t uid = *(uint64_t *)colDataGetData(pInput->columnData, 0);
|
||||
|
||||
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
metaGetTableNameByUid(pInput->param, uid, str);
|
||||
colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, str, pInput->numOfRows);
|
||||
colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows);
|
||||
pOutput->numOfRows += pInput->numOfRows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s
|
|||
|
||||
SColumnInfoData idata = {0};
|
||||
idata.info = *colInfo;
|
||||
colInfoDataEnsureCapacity(&idata, rows);
|
||||
colInfoDataEnsureCapacity(&idata, rows, true);
|
||||
|
||||
blockDataAppendColInfo(res, &idata);
|
||||
|
||||
|
@ -104,7 +104,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s
|
|||
SSDataBlock *res = *(SSDataBlock **)taosArrayGetLast(pBlockList);
|
||||
SColumnInfoData idata = {0};
|
||||
idata.info = *colInfo;
|
||||
colInfoDataEnsureCapacity(&idata, rows);
|
||||
colInfoDataEnsureCapacity(&idata, rows, true);
|
||||
blockDataAppendColInfo(res, &idata);
|
||||
|
||||
*dataBlockId = taosArrayGetSize(pBlockList) - 1;
|
||||
|
@ -146,12 +146,12 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
|
|||
SSDataBlock *res = createDataBlock();
|
||||
for (int32_t i = 0; i < 2; ++i) {
|
||||
SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_INT, 10, i + 1);
|
||||
colInfoDataEnsureCapacity(&idata, rowNum);
|
||||
colInfoDataEnsureCapacity(&idata, rowNum, true);
|
||||
blockDataAppendColInfo(res, &idata);
|
||||
}
|
||||
|
||||
SColumnInfoData idata = createColumnInfoData(dataType, dataBytes, 3);
|
||||
colInfoDataEnsureCapacity(&idata, rowNum);
|
||||
colInfoDataEnsureCapacity(&idata, rowNum, true);
|
||||
blockDataAppendColInfo(res, &idata);
|
||||
res->info.capacity = rowNum;
|
||||
|
||||
|
@ -175,7 +175,7 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
|
|||
|
||||
int32_t idx = taosArrayGetSize(res->pDataBlock);
|
||||
SColumnInfoData idata = createColumnInfoData(dataType, dataBytes, 1 + idx);
|
||||
colInfoDataEnsureCapacity(&idata, rowNum);
|
||||
colInfoDataEnsureCapacity(&idata, rowNum, true);
|
||||
|
||||
res->info.capacity = rowNum;
|
||||
blockDataAppendColInfo(res, &idata);
|
||||
|
@ -2022,7 +2022,7 @@ void scltMakeDataBlock(SScalarParam **pInput, int32_t type, void *pVal, int32_t
|
|||
input->numOfRows = num;
|
||||
|
||||
input->columnData->info = createColumnInfo(0, type, bytes);
|
||||
colInfoDataEnsureCapacity(input->columnData, num);
|
||||
colInfoDataEnsureCapacity(input->columnData, num, true);
|
||||
|
||||
if (setVal) {
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
|
|
|
@ -15,7 +15,7 @@ class TDTestCase:
|
|||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
tdSql.init(conn.cursor(), True)
|
||||
self.dbname = "db"
|
||||
|
||||
def __cast_to_bigint(self, col_name, tbname):
|
||||
|
|
Loading…
Reference in New Issue