refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2022-11-04 18:46:48 +08:00
parent 9c33d10ad9
commit 03f4918b3b
17 changed files with 133 additions and 232 deletions

View File

@ -215,7 +215,7 @@ size_t blockDataGetSerialMetaSize(uint32_t numOfCols);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);

View File

@ -241,13 +241,6 @@ struct STag {
memcpy(varDataVal(x), (str), __len); \ memcpy(varDataVal(x), (str), __len); \
} while (0); } while (0);
#define STR_TO_NET_VARSTR(x, str) \
do { \
VarDataLenT __len = (VarDataLenT)strlen(str); \
*(VarDataLenT *)(x) = htons(__len); \
memcpy(varDataVal(x), (str), __len); \
} while (0);
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ #define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \
do { \ do { \
char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \ char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \

View File

@ -182,7 +182,7 @@ struct SScalarParam {
}; };
void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell); void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell);
int32_t getNumOfResult(SqlFunctionCtx *pCtx, int32_t num, SSDataBlock *pResBlock); //int32_t getNumOfResult(SqlFunctionCtx *pCtx, int32_t num, SSDataBlock *pResBlock);
bool isRowEntryCompleted(struct SResultRowEntryInfo *pEntry); bool isRowEntryCompleted(struct SResultRowEntryInfo *pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo *pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo *pEntry);

View File

@ -224,7 +224,7 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc
typedef enum EFuncDataRequired { typedef enum EFuncDataRequired {
FUNC_DATA_REQUIRED_DATA_LOAD = 1, FUNC_DATA_REQUIRED_DATA_LOAD = 1,
FUNC_DATA_REQUIRED_STATIS_LOAD, FUNC_DATA_REQUIRED_SMA_LOAD,
FUNC_DATA_REQUIRED_NOT_LOAD, FUNC_DATA_REQUIRED_NOT_LOAD,
FUNC_DATA_REQUIRED_FILTEROUT, FUNC_DATA_REQUIRED_FILTEROUT,
} EFuncDataRequired; } EFuncDataRequired;

View File

@ -1143,7 +1143,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
} }
} }
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows) { static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) {
ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows); ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows);
if (numOfRows < pBlockInfo->capacity) { if (numOfRows < pBlockInfo->capacity) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1182,8 +1182,10 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
pColumn->pData = tmp; pColumn->pData = tmp;
if (clearPayload) {
memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1203,9 +1205,9 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
} }
} }
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) { int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload) {
SDataBlockInfo info = {0}; SDataBlockInfo info = {0};
return doEnsureCapacity(pColumn, &info, numOfRows); return doEnsureCapacity(pColumn, &info, numOfRows, clearPayload);
} }
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
@ -1221,7 +1223,7 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
code = doEnsureCapacity(p, &pDataBlock->info, numOfRows); code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false);
if (code) { if (code) {
return code; return code;
} }

View File

@ -644,6 +644,13 @@ typedef struct SSttBlockLoadInfo {
int16_t *colIds; int16_t *colIds;
int32_t numOfCols; int32_t numOfCols;
bool sttBlockLoaded; bool sttBlockLoaded;
// keep the last access position, this position may be used to reduce the binary times for
// starting last block data for a new table
struct {
int32_t blockIndex;
int32_t rowIndex;
} prevEndPos;
} SSttBlockLoadInfo; } SSttBlockLoadInfo;
typedef struct SMergeTree { typedef struct SMergeTree {

View File

@ -536,10 +536,10 @@ typedef struct SSysTableScanInfo {
} SSysTableScanInfo; } SSysTableScanInfo;
typedef struct SBlockDistInfo { typedef struct SBlockDistInfo {
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
STsdbReader* pHandle; STsdbReader* pHandle;
SReadHandle readHandle; SReadHandle readHandle;
uint64_t uid; // table uid uint64_t uid; // table uid
} SBlockDistInfo; } SBlockDistInfo;
// todo remove this // todo remove this
@ -550,7 +550,6 @@ typedef struct SOptrBasicInfo {
} SOptrBasicInfo; } SOptrBasicInfo;
typedef struct SIntervalAggOperatorInfo { typedef struct SIntervalAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function SExprSupp scalarSupp; // supporter for perform scalar function
@ -571,7 +570,6 @@ typedef struct SIntervalAggOperatorInfo {
typedef struct SMergeAlignedIntervalAggOperatorInfo { typedef struct SMergeAlignedIntervalAggOperatorInfo {
SIntervalAggOperatorInfo* intervalAggOperatorInfo; SIntervalAggOperatorInfo* intervalAggOperatorInfo;
// bool hasGroupId;
uint64_t groupId; // current groupId uint64_t groupId; // current groupId
int64_t curTs; // current ts int64_t curTs; // current ts
SSDataBlock* prefetchedBlock; SSDataBlock* prefetchedBlock;
@ -839,10 +837,6 @@ typedef struct SSortOperatorInfo {
SNode* pCondition; SNode* pCondition;
} SSortOperatorInfo; } SSortOperatorInfo;
typedef struct STagFilterOperatorInfo {
SOptrBasicInfo binfo;
} STagFilterOperatorInfo;
typedef struct SJoinOperatorInfo { typedef struct SJoinOperatorInfo {
SSDataBlock* pRes; SSDataBlock* pRes;
int32_t joinType; int32_t joinType;
@ -907,7 +901,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
void cleanupAggSup(SAggSupporter* pAggSup); void cleanupAggSup(SAggSupporter* pAggSup);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId); void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts); int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts);
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts); int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts);

View File

@ -170,14 +170,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
pRes->info.rows = 1; pRes->info.rows = 1;
if (pInfo->pseudoExprSup.numOfExprs > 0) { SExprSupp* pSup = &pInfo->pseudoExprSup;
SExprSupp* pSup = &pInfo->pseudoExprSup; int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows, pRes->info.rows, GET_TASKID(pTaskInfo));
GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code;
pTaskInfo->code = code; return NULL;
return NULL;
}
} }
pRes->info.groupId = getTableGroupId(pTableList, pRes->info.uid); pRes->info.groupId = getTableGroupId(pTableList, pRes->info.uid);

View File

@ -379,7 +379,7 @@ static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarPara
pColumnData->info.scale = pType->scale; pColumnData->info.scale = pType->scale;
pColumnData->info.precision = pType->precision; pColumnData->info.precision = pType->precision;
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows); int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
taosMemoryFree(pColumnData); taosMemoryFree(pColumnData);
@ -1828,8 +1828,6 @@ static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
} }
static int32_t sortTableGroup(STableListInfo* pTableListInfo) { static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
int32_t size = taosArrayGetSize(pTableListInfo->pTableList); int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
@ -1851,6 +1849,11 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList); pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
if (pTableListInfo->groupOffset == NULL) {
taosArrayDestroy(pList);
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups); memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
taosArrayDestroy(pList); taosArrayDestroy(pList);
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;

View File

@ -87,7 +87,7 @@ static void releaseQueryBuf(size_t numOfTables);
static void destroyFillOperatorInfo(void* param); static void destroyFillOperatorInfo(void* param);
static void destroyProjectOperatorInfo(void* param); static void destroyProjectOperatorInfo(void* param);
static void destroyOrderOperatorInfo(void* param); static void destroySortOperatorInfo(void* param);
static void destroyAggOperatorInfo(void* param); static void destroyAggOperatorInfo(void* param);
static void destroyIntervalOperatorInfo(void* param); static void destroyIntervalOperatorInfo(void* param);
@ -322,7 +322,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP; pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
pColData->info.bytes = sizeof(int64_t); pColData->info.bytes = sizeof(int64_t);
colInfoDataEnsureCapacity(pColData, 5); colInfoDataEnsureCapacity(pColData, 5, false);
colDataAppendInt64(pColData, 0, &pQueryWindow->skey); colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
colDataAppendInt64(pColData, 1, &pQueryWindow->ekey); colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);
@ -439,7 +439,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
pColInfo = pInput->pData[paramIndex]; pColInfo = pInput->pData[paramIndex];
} }
colInfoDataEnsureCapacity(pColInfo, numOfRows); colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
int8_t type = pFuncParam->param.nType; int8_t type = pFuncParam->param.nType;
if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) { if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
@ -1047,8 +1047,6 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pCol
SFilterInfo* filter = pFilterInfo; SFilterInfo* filter = pFilterInfo;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// pError("start filter");
// todo move to the initialization function // todo move to the initialization function
int32_t code = 0; int32_t code = 0;
bool needFree = false; bool needFree = false;

View File

@ -58,6 +58,18 @@ typedef struct {
void* pVnode; void* pVnode;
} SSTabFltArg; } SSTabFltArg;
typedef struct STableMergeScanExecInfo {
SFileBlockLoadRecorder blockRecorder;
SSortExecInfo sortExecInfo;
} STableMergeScanExecInfo;
typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator;
int32_t readerIdx;
uint64_t uid;
SSDataBlock* inputBlock;
} STableMergeScanSortSourceParam;
static int32_t sysChkFilter__Comm(SNode* pNode); static int32_t sysChkFilter__Comm(SNode* pNode);
static int32_t sysChkFilter__DBName(SNode* pNode); static int32_t sysChkFilter__DBName(SNode* pNode);
static int32_t sysChkFilter__VgroupId(SNode* pNode); static int32_t sysChkFilter__VgroupId(SNode* pNode);
@ -69,15 +81,15 @@ static int32_t sysChkFilter__STableName(SNode* pNode);
static int32_t sysChkFilter__Uid(SNode* pNode); static int32_t sysChkFilter__Uid(SNode* pNode);
static int32_t sysChkFilter__Type(SNode* pNode); static int32_t sysChkFilter__Type(SNode* pNode);
static int32_t sysFilte__DbName(void* pMeta, SNode* pNode, SArray* result); static int32_t sysFilte__DbName(void* arg, SNode* pNode, SArray* result);
static int32_t sysFilte__VgroupId(void* pMeta, SNode* pNode, SArray* result); static int32_t sysFilte__VgroupId(void* arg, SNode* pNode, SArray* result);
static int32_t sysFilte__TableName(void* pMeta, SNode* pNode, SArray* result); static int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result);
static int32_t sysFilte__CreateTime(void* pMeta, SNode* pNode, SArray* result); static int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result);
static int32_t sysFilte__Ncolumn(void* pMeta, SNode* pNode, SArray* result); static int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result);
static int32_t sysFilte__Ttl(void* pMeta, SNode* pNode, SArray* result); static int32_t sysFilte__Ttl(void* arg, SNode* pNode, SArray* result);
static int32_t sysFilte__STableName(void* pMeta, SNode* pNode, SArray* result); static int32_t sysFilte__STableName(void* arg, SNode* pNode, SArray* result);
static int32_t sysFilte__Uid(void* pMeta, SNode* pNode, SArray* result); static int32_t sysFilte__Uid(void* arg, SNode* pNode, SArray* result);
static int32_t sysFilte__Type(void* pMeta, SNode* pNode, SArray* result); static int32_t sysFilte__Type(void* arg, SNode* pNode, SArray* result);
const SSTabFltFuncDef filterDict[] = { const SSTabFltFuncDef filterDict[] = {
{.name = "table_name", .chkFunc = sysChkFilter__TableName, .fltFunc = sysFilte__TableName}, {.name = "table_name", .chkFunc = sysChkFilter__TableName, .fltFunc = sysFilte__TableName},
@ -95,7 +107,7 @@ const SSTabFltFuncDef filterDict[] = {
static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result); static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result);
static int32_t optSysTabFilteImpl(void* arg, SNode* cond, SArray* result); static int32_t optSysTabFilteImpl(void* arg, SNode* cond, SArray* result);
static int32_t optSysCheckOper(SNode* pOpear); static int32_t optSysCheckOper(SNode* pOpear);
static int32_t optSysMergeRslt(SArray* multiRslt, SArray* reslt); static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
static bool processBlockWithProbability(const SSampleExecInfo* pInfo); static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
@ -123,29 +135,6 @@ static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
} }
} }
static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
#if 0
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);
size_t t = taosArrayGetSize(group);
for (int32_t j = 0; j < t; ++j) {
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
updateTableQueryInfoForReverseScan(pCheckInfo);
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
// the start check timestamp of tsdbQueryHandle
// STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
// pTableKeyInfo->lastKey = pCheckInfo->lastKey;
//
// assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
}
}
#endif
}
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) { static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order); int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') { if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
@ -291,7 +280,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols, static bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols,
int32_t numOfRows) { int32_t numOfRows) {
if (pColsAgg == NULL || pFilterNode == NULL) { if (pColsAgg == NULL || pFilterNode == NULL) {
return true; return true;
@ -360,13 +349,13 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo
// todo handle the slimit info // todo handle the slimit info
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) { void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
SLimit* pLimit = &pLimitInfo->limit; SLimit* pLimit = &pLimitInfo->limit;
const char* id = GET_TASKID(pTaskInfo);
if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) { if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
if (pLimitInfo->remainOffset >= pBlock->info.rows) { if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows; pLimitInfo->remainOffset -= pBlock->info.rows;
pBlock->info.rows = 0; pBlock->info.rows = 0;
qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
GET_TASKID(pTaskInfo));
} else { } else {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
pLimitInfo->remainOffset = 0; pLimitInfo->remainOffset = 0;
@ -379,7 +368,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
int32_t keep = pBlock->info.rows - overflowRows; int32_t keep = pBlock->info.rows - overflowRows;
blockDataKeepFirstNRows(pBlock, keep); blockDataKeepFirstNRows(pBlock, keep);
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
} }
@ -429,10 +418,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
if (pTableScanInfo->pseudoSup.numOfExprs > 0) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
ensureBlockCapacity(pBlock, pBlock->info.rows); ensureBlockCapacity(pBlock, pBlock->info.rows);
} }
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
pCost->skipBlocks += 1; pCost->skipBlocks += 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
pCost->loadBlockStatis += 1; pCost->loadBlockStatis += 1;
loadSMA = true; // mark the operation of load sma; loadSMA = true; // mark the operation of load sma;
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
@ -536,7 +526,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock, int32_t rows, const char* idStr) { SSDataBlock* pBlock, int32_t rows, const char* idStr) {
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (numOfPseudoExpr == 0) { if (numOfPseudoExpr <= 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -566,7 +556,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
// this is to handle the tbname // this is to handle the tbname
if (fmIsScanPseudoColumnFunc(functionId)) { if (fmIsScanPseudoColumnFunc(functionId)) {
setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId); setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name);
} else { // these are tags } else { // these are tags
STagVal tagVal = {0}; STagVal tagVal = {0};
tagVal.cid = pExpr->base.pParam[0].pCol->colId; tagVal.cid = pExpr->base.pParam[0].pCol->colId;
@ -602,16 +592,20 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) { void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
struct SScalarFuncExecFuncs fpSet = {0}; struct SScalarFuncExecFuncs fpSet = {0};
fmGetScalarFuncExecFuncs(functionId, &fpSet); fmGetScalarFuncExecFuncs(functionId, &fpSet);
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, sizeof(uint64_t), 1); size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
colInfoDataEnsureCapacity(&infoData, 1); char buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(buf, name)
colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
colInfoDataEnsureCapacity(&infoData, 1, false);
colDataAppend(&infoData, 0, buf, false);
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData}; SScalarParam param = {.columnData = pColInfoData};
if (fpSet.process != NULL) { if (fpSet.process != NULL) {
@ -679,7 +673,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -746,7 +740,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
while (1) { while (1) {
SSDataBlock* result = doTableScanGroup(pOperator); SSDataBlock* result = doGroupedTableScan(pOperator);
if (result) { if (result) {
return result; return result;
} }
@ -784,7 +778,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
} }
} }
SSDataBlock* result = doTableScanGroup(pOperator); SSDataBlock* result = doGroupedTableScan(pOperator);
if (result != NULL) { if (result != NULL) {
ASSERT(result->info.uid != 0); ASSERT(result->info.uid != 0);
return result; return result;
@ -808,7 +802,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
tsdbReaderReset(pInfo->dataReader, &pInfo->cond); tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
result = doTableScanGroup(pOperator); result = doGroupedTableScan(pOperator);
if (result != NULL) { if (result != NULL) {
return result; return result;
} }
@ -851,24 +845,25 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto _error; goto _error;
} }
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; SScanPhysiNode* pScanNode = &pTableScanNode->scan;
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0; int32_t numOfCols = 0;
int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
&pInfo->matchInfo); &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->limitInfo);
code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
if (pTableScanNode->scan.pScanPseudoCols != NULL) { if (pScanNode->pScanPseudoCols != NULL) {
SExprSupp* pSup = &pInfo->pseudoSup; SExprSupp* pSup = &pInfo->pseudoSup;
pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
} }
@ -880,7 +875,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createResDataBlock(pDescNode);
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; pInfo->pFilterNode = pScanNode->node.pConditions;
if (pInfo->pFilterNode != NULL) { if (pInfo->pFilterNode != NULL) {
code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
@ -988,7 +983,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize, int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, (int32_t*)&blockDistInfo.rowSize,
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
@ -2430,7 +2425,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle->initTableReader) { if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->dataReader = NULL; pTSInfo->dataReader = NULL;
int32_t code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL); code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL);
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;
destroyTableScanOperatorInfo(pTableScanOp); destroyTableScanOperatorInfo(pTableScanOp);
@ -4098,12 +4093,14 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
goto _error; goto _error;
} }
SScanPhysiNode* pScanNode = &pScanPhyNode->scan; SScanPhysiNode* pScanNode = &pScanPhyNode->scan;
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
int32_t num = 0; int32_t num = 0;
int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->accountId = pScanPhyNode->accountId; pInfo->accountId = pScanPhyNode->accountId;
pInfo->pUser = taosMemoryStrDup((void*)pUser); pInfo->pUser = taosMemoryStrDup((void*)pUser);
@ -4140,7 +4137,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
return pOperator; return pOperator;
_error: _error:
taosMemoryFreeClear(pInfo); if (pInfo != NULL) {
destroySysScanOperator(pInfo);
}
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL; return NULL;
@ -4244,16 +4243,15 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;
int32_t num = 0;
int32_t numOfExprs = 0; int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
int32_t code = int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); int32_t num = 0;
code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -4285,21 +4283,9 @@ _error:
return NULL; return NULL;
} }
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) {
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, i);
STsdbReader* pReader = NULL;
tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr);
taosArrayPush(arrayReader, &pReader);
}
return TSDB_CODE_SUCCESS;
}
// todo refactor // todo refactor
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { SSDataBlock* pBlock, uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
@ -4334,12 +4320,11 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
pCost->loadBlockStatis += 1; pCost->loadBlockStatis += 1;
bool allColumnsHaveAgg = true; bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL; SColumnDataAgg** pColAgg = NULL;
// STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
if (allColumnsHaveAgg == true) { if (allColumnsHaveAgg == true) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
@ -4388,13 +4373,12 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pTableScanInfo->pseudoSup.numOfExprs > 0) { SExprSupp* pSup = &pTableScanInfo->pseudoSup;
int32_t code =
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); pBlock->info.rows, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
}
} }
if (pTableScanInfo->pFilterNode != NULL) { if (pTableScanInfo->pFilterNode != NULL) {
@ -4416,13 +4400,6 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator;
int32_t readerIdx;
uint64_t uid;
SSDataBlock* inputBlock;
} STableMergeScanSortSourceParam;
static SSDataBlock* getTableDataBlockImpl(void* param) { static SSDataBlock* getTableDataBlockImpl(void* param) {
STableMergeScanSortSourceParam* source = param; STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator; SOperatorInfo* pOperator = source->pOperator;
@ -4433,7 +4410,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
STableMergeScanInfo* pTableScanInfo = pOperator->info; STableMergeScanInfo* pTableScanInfo = pOperator->info;
SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx); SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx);
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
@ -4443,13 +4419,13 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo)); int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo));
if (code != 0) { if (code != 0) {
T_LONG_JMP(pOperator->pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
STsdbReader* reader = pInfo->pReader; STsdbReader* reader = pInfo->pReader;
while (tsdbNextDataBlock(reader)) { while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pOperator->pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
// process this data block based on the probabilities // process this data block based on the probabilities
@ -4472,9 +4448,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
} }
uint32_t status = 0; uint32_t status = 0;
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status); code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pOperator->pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
// current block is filter out according to filter condition, continue load the next block // current block is filter out according to filter condition, continue load the next block
@ -4482,71 +4458,21 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
continue; continue;
} }
pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid); pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows; pOperator->resultInfo.totalRows += pBlock->info.rows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
tsdbReaderClose(pInfo->pReader); tsdbReaderClose(pInfo->pReader);
pInfo->pReader = NULL; pInfo->pReader = NULL;
return pBlock; return pBlock;
} }
tsdbReaderClose(pInfo->pReader); tsdbReaderClose(pInfo->pReader);
pInfo->pReader = NULL; pInfo->pReader = NULL;
return NULL; return NULL;
} }
static SSDataBlock* getTableDataBlock(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
int32_t readerIdx = source->readerIdx;
SSDataBlock* pBlock = source->inputBlock;
STableMergeScanInfo* pTableScanInfo = pOperator->info;
int64_t st = taosGetTimestampUs();
blockDataCleanup(pBlock);
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
if (!processThisBlock) {
continue;
}
blockDataCleanup(pBlock);
int32_t rows = 0;
tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window);
blockDataEnsureCapacity(pBlock, rows);
pBlock->info.rows = rows;
uint32_t status = 0;
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue;
}
pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid);
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
}
return NULL;
}
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
int32_t tsTargetSlotId = 0; int32_t tsTargetSlotId = 0;
for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
@ -4781,11 +4707,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
typedef struct STableMergeScanExecInfo {
SFileBlockLoadRecorder blockRecorder;
SSortExecInfo sortExecInfo;
} STableMergeScanExecInfo;
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
ASSERT(pOptr != NULL); ASSERT(pOptr != NULL);
// TODO: merge these two info into one struct // TODO: merge these two info into one struct
@ -4800,18 +4721,6 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
const STableKeyInfo* info1 = p1;
const STableKeyInfo* info2 = p2;
if (info1->groupId < info2->groupId) {
return -1;
} else if (info1->groupId > info2->groupId) {
return 1;
} else {
return 0;
}
}
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
@ -4825,6 +4734,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
int32_t numOfCols = 0; int32_t numOfCols = 0;
int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
&pInfo->matchInfo); &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -20,7 +20,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator);
static int32_t doOpenSortOperator(SOperatorInfo* pOperator); static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
static void destroyOrderOperatorInfo(void* param); static void destroySortOperatorInfo(void* param);
// todo add limit/offset impl // todo add limit/offset impl
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
@ -64,7 +64,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
// TODO dynamic set the available sort buffer // TODO dynamic set the available sort buffer
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, getExplainExecInfo); createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroySortOperatorInfo, getExplainExecInfo);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -249,7 +249,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
return blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL; return blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
} }
void destroyOrderOperatorInfo(void* param) { void destroySortOperatorInfo(void* param) {
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
@ -729,12 +729,11 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = createResDataBlock(pDescNode); pInfo->binfo.pRes = createResDataBlock(pDescNode);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize; int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
ASSERT(rowSize < 100 * 1024 * 1024); ASSERT(rowSize < 100 * 1024 * 1024);
SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
&pInfo->matchInfo); &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -746,7 +745,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->groupSort = pMergePhyNode->groupSort; pInfo->groupSort = pMergePhyNode->groupSort;
pInfo->pSortInfo = pSortInfo; pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
pInfo->pInputBlock = pInputBlock; pInfo->pInputBlock = pInputBlock;
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
@ -768,11 +767,11 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
return pOperator; return pOperator;
_error: _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
if (pInfo != NULL) { if (pInfo != NULL) {
destroyMultiwayMergeOperatorInfo(pInfo); destroyMultiwayMergeOperatorInfo(pInfo);
} }
pTaskInfo->code = code;
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
return NULL; return NULL;
} }

View File

@ -489,7 +489,7 @@ EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWind
if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) { if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
return FUNC_DATA_REQUIRED_NOT_LOAD; return FUNC_DATA_REQUIRED_NOT_LOAD;
} }
return FUNC_DATA_REQUIRED_STATIS_LOAD; return FUNC_DATA_REQUIRED_SMA_LOAD;
} }
bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
@ -1103,7 +1103,7 @@ int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
} }
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
return FUNC_DATA_REQUIRED_STATIS_LOAD; return FUNC_DATA_REQUIRED_SMA_LOAD;
} }
bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {

View File

@ -279,7 +279,7 @@ static EFuncDataRequired scanPathOptPromoteDataRequired(EFuncDataRequired l, EFu
switch (l) { switch (l) {
case FUNC_DATA_REQUIRED_DATA_LOAD: case FUNC_DATA_REQUIRED_DATA_LOAD:
return l; return l;
case FUNC_DATA_REQUIRED_STATIS_LOAD: case FUNC_DATA_REQUIRED_SMA_LOAD:
return FUNC_DATA_REQUIRED_DATA_LOAD == r ? r : l; return FUNC_DATA_REQUIRED_DATA_LOAD == r ? r : l;
case FUNC_DATA_REQUIRED_NOT_LOAD: case FUNC_DATA_REQUIRED_NOT_LOAD:
return FUNC_DATA_REQUIRED_FILTEROUT == r ? l : r; return FUNC_DATA_REQUIRED_FILTEROUT == r ? l : r;

View File

@ -14,7 +14,6 @@ target_link_libraries(scalar
PRIVATE nodes PRIVATE nodes
PRIVATE function PRIVATE function
PRIVATE qcom PRIVATE qcom
PRIVATE vnode
) )
if(${BUILD_TEST}) if(${BUILD_TEST})

View File

@ -49,7 +49,7 @@ int32_t sclCreateColumnInfoData(SDataType *pType, int32_t numOfRows, SScalarPara
pColumnData->info.scale = pType->scale; pColumnData->info.scale = pType->scale;
pColumnData->info.precision = pType->precision; pColumnData->info.precision = pType->precision;
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows); int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pColumnData); taosMemoryFree(pColumnData);
@ -70,7 +70,7 @@ int32_t sclConvertValueToSclParam(SValueNode* pValueNode, SScalarParam* out, int
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false); colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
colInfoDataEnsureCapacity(out->columnData, 1); colInfoDataEnsureCapacity(out->columnData, 1, true);
code = vectorConvertSingleColImpl(&in, out, overflow, -1, -1); code = vectorConvertSingleColImpl(&in, out, overflow, -1, -1);
sclFreeParam(&in); sclFreeParam(&in);
@ -88,7 +88,7 @@ int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockL
pLeft->numOfRows = pb->info.rows; pLeft->numOfRows = pb->info.rows;
if (pDst->numOfRows < pb->info.rows) { if (pDst->numOfRows < pb->info.rows) {
colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows); colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows, true);
} }
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN); _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
@ -1604,7 +1604,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
if (1 == res->numOfRows) { if (1 == res->numOfRows) {
SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList)); SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList));
} else { } else {
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows); colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true);
colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL); colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
pDst->numOfRows = res->numOfRows; pDst->numOfRows = res->numOfRows;
pDst->numOfQualified = res->numOfQualified; pDst->numOfQualified = res->numOfQualified;

View File

@ -6,7 +6,6 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "tjson.h" #include "tjson.h"
#include "ttime.h" #include "ttime.h"
#include "vnode.h"
typedef float (*_float_fn)(float); typedef float (*_float_fn)(float);
typedef double (*_double_fn)(double); typedef double (*_double_fn)(double);
@ -1718,12 +1717,9 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1); ASSERT(inputNum == 1);
char* p = colDataGetVarData(pInput->columnData, 0);
uint64_t uid = *(uint64_t *)colDataGetData(pInput->columnData, 0); colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows);
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
metaGetTableNameByUid(pInput->param, uid, str);
colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, str, pInput->numOfRows);
pOutput->numOfRows += pInput->numOfRows; pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }