[td-11818] refactor.
This commit is contained in:
parent
9800c8d552
commit
ec99edc532
|
@ -582,9 +582,9 @@ typedef struct SOrderOperatorInfo {
|
|||
|
||||
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
|
||||
|
||||
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
|
||||
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
|
||||
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
|
||||
|
||||
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
|
|
|
@ -4835,11 +4835,11 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
|
|||
break;
|
||||
}
|
||||
case OP_TableSeqScan: {
|
||||
pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
|
||||
pRuntimeEnv->proot = createTableSeqScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
|
||||
break;
|
||||
}
|
||||
case OP_DataBlocksOptScan: {
|
||||
pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
|
||||
pRuntimeEnv->proot = createTableScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
|
||||
break;
|
||||
}
|
||||
case OP_TableScan: {
|
||||
|
@ -5162,7 +5162,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
|
||||
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
||||
|
||||
pInfo->pTsdbReadHandle = pTsdbQueryHandle;
|
||||
|
@ -5267,7 +5267,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
|
|||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
|
||||
assert(repeatTime > 0);
|
||||
|
||||
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
||||
|
|
|
@ -274,9 +274,9 @@ typedef struct STaskRuntimeEnv {
|
|||
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
||||
SArray* pResultRowArrayList; // The array list that contains the Result rows
|
||||
char* keyBuf; // window key buffer
|
||||
SResultRowPool*
|
||||
pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
|
||||
// The window result objects pool, all the resultRow Objects are allocated and managed by this object.
|
||||
char** prevRow;
|
||||
SResultRowPool* pool;
|
||||
|
||||
SArray* prevResult; // intermediate result, SArray<SInterResult>
|
||||
STSBuf* pTsBuf; // timestamp filter list
|
||||
|
@ -440,13 +440,13 @@ typedef struct SOptrBasicInfo {
|
|||
SqlFunctionCtx* pCtx;
|
||||
SSDataBlock* pRes;
|
||||
uint32_t resRowSize;
|
||||
int32_t capacity;
|
||||
} SOptrBasicInfo;
|
||||
|
||||
typedef struct SOptrBasicInfo STableIntervalOperatorInfo;
|
||||
|
||||
typedef struct SAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
uint32_t seed;
|
||||
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
|
||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
||||
|
@ -455,6 +455,8 @@ typedef struct SAggOperatorInfo {
|
|||
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
|
||||
STableQueryInfo *current;
|
||||
uint32_t groupId;
|
||||
SGroupResInfo *pGroupResInfo;
|
||||
STableQueryInfo *pTableQueryInfo;
|
||||
} SAggOperatorInfo;
|
||||
|
||||
typedef struct SProjectOperatorInfo {
|
||||
|
@ -591,9 +593,9 @@ typedef struct SOrderOperatorInfo {
|
|||
} SOrderOperatorInfo;
|
||||
|
||||
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
|
||||
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
|
||||
|
@ -672,8 +674,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo*
|
|||
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
|
||||
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
|
||||
|
||||
STableQueryInfo* createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win,
|
||||
void* buf);
|
||||
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win);
|
||||
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
|
||||
|
||||
int32_t buildArithmeticExprFromMsg(SExprInfo* pArithExprInfo, void* pQueryMsg);
|
||||
|
|
|
@ -215,7 +215,7 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
|
||||
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity);
|
||||
|
||||
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
|
||||
static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
|
||||
|
@ -228,8 +228,7 @@ static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx
|
|||
static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
|
||||
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SArray* getOrderCheckColumns(STaskAttr* pQuery);
|
||||
|
||||
SArray* getOrderCheckColumns(STaskAttr* pQuery);
|
||||
|
||||
typedef struct SRowCompSupporter {
|
||||
STaskRuntimeEnv *pRuntimeEnv;
|
||||
|
@ -2007,7 +2006,7 @@ static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI
|
|||
return pFuncCtx;
|
||||
}
|
||||
|
||||
static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowCellInfoOffset) {
|
||||
static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowCellInfoOffset, uint32_t* pRowSize) {
|
||||
size_t numOfOutput = taosArrayGetSize(pExprInfo);
|
||||
|
||||
SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)calloc(numOfOutput, sizeof(SqlFunctionCtx));
|
||||
|
@ -2105,6 +2104,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
|
|||
for(int32_t i = 1; i < numOfOutput; ++i) {
|
||||
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i - 1);
|
||||
(*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr->base.interBytes);
|
||||
*pRowSize += pExpr->base.resSchema.bytes;
|
||||
}
|
||||
|
||||
setCtxTagColumnInfo(pFuncCtx, numOfOutput);
|
||||
|
@ -3250,8 +3250,7 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SqlFunctionCtx* pCt
|
|||
}
|
||||
}
|
||||
|
||||
void copyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock, int32_t* offset) {
|
||||
SGroupResInfo* pGroupResInfo = &pRuntimeEnv->groupResInfo;
|
||||
void copyToSDataBlock(SSDataBlock* pBlock, int32_t* offset, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pResBuf) {
|
||||
pBlock->info.rows = 0;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -3259,12 +3258,12 @@ void copyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlo
|
|||
// all results in current group have been returned to client, try next group
|
||||
if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) {
|
||||
assert(pGroupResInfo->index == 0);
|
||||
if ((code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pRuntimeEnv, offset)) != TSDB_CODE_SUCCESS) {
|
||||
// if ((code = mergeIntoGroupResult(&pGroupResInfo, pRuntimeEnv, offset)) != TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
// }
|
||||
}
|
||||
|
||||
doCopyToSDataBlock(pRuntimeEnv, pGroupResInfo, TSDB_ORDER_ASC, pBlock);
|
||||
// doCopyToSDataBlock(pResBuf, pGroupResInfo, TSDB_ORDER_ASC, pBlock, );
|
||||
|
||||
// current data are all dumped to result buffer, clear it
|
||||
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
|
||||
|
@ -3275,9 +3274,9 @@ void copyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlo
|
|||
}
|
||||
|
||||
// enough results in data buffer, return
|
||||
if (pBlock->info.rows >= threshold) {
|
||||
break;
|
||||
}
|
||||
// if (pBlock->info.rows >= threshold) {
|
||||
// break;
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3381,7 +3380,7 @@ void setDefaultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, in
|
|||
initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols);
|
||||
}
|
||||
|
||||
void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int64_t uid, int32_t stage, SExecTaskInfo* pTaskInfo) {
|
||||
void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int32_t stage, SExecTaskInfo* pTaskInfo) {
|
||||
SOptrBasicInfo *pInfo = &pAggInfo->binfo;
|
||||
|
||||
SqlFunctionCtx* pCtx = pInfo->pCtx;
|
||||
|
@ -3390,8 +3389,10 @@ void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int64_t uid, int32_t sta
|
|||
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
|
||||
|
||||
int64_t tid = 0;
|
||||
int64_t groupId = 0;
|
||||
|
||||
pAggInfo->keyBuf = realloc(pAggInfo->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES);
|
||||
SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid, pTaskInfo, false, pAggInfo);
|
||||
SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pAggInfo);
|
||||
|
||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
|
@ -3597,24 +3598,19 @@ static bool hasMainOutput(STaskAttr *pQueryAttr) {
|
|||
return false;
|
||||
}
|
||||
|
||||
STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf) {
|
||||
STableQueryInfo *createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win) {
|
||||
STableQueryInfo *pTableQueryInfo = buf;
|
||||
|
||||
// pTableQueryInfo->win = win;
|
||||
pTableQueryInfo->lastKey = win.skey;
|
||||
|
||||
// pTableQueryInfo->pTable = pTable;
|
||||
// pTableQueryInfo->cur.vgroupIndex = -1;
|
||||
|
||||
// set more initial size of interval/groupby query
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr) || groupbyColumn) {
|
||||
// if (/*QUERY_IS_INTERVAL_QUERY(pQueryAttr) || */groupbyColumn) {
|
||||
int32_t initialSize = 128;
|
||||
int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize, TSDB_DATA_TYPE_INT);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
} else { // in other aggregate query, do not initialize the windowResInfo
|
||||
}
|
||||
// } else { // in other aggregate query, do not initialize the windowResInfo
|
||||
// }
|
||||
|
||||
return pTableQueryInfo;
|
||||
}
|
||||
|
@ -3962,9 +3958,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) {
|
|||
* @param result
|
||||
*/
|
||||
|
||||
static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock) {
|
||||
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
|
||||
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity) {
|
||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
|
||||
|
||||
|
@ -3990,13 +3984,13 @@ static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* p
|
|||
}
|
||||
|
||||
int32_t numOfRowsToCopy = pRow->numOfRows;
|
||||
if (numOfResult + numOfRowsToCopy >= pRuntimeEnv->resultInfo.capacity) {
|
||||
if (numOfResult + numOfRowsToCopy >= rowCapacity) {
|
||||
break;
|
||||
}
|
||||
|
||||
pGroupResInfo->index += 1;
|
||||
|
||||
SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pRow->pageId);
|
||||
SFilePage *page = getBufPage(pBuf, pRow->pageId);
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
|
||||
|
@ -4004,14 +3998,14 @@ static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* p
|
|||
int32_t bytes = pColInfoData->info.bytes;
|
||||
|
||||
char *out = pColInfoData->pData + numOfResult * bytes;
|
||||
char *in = getPosInResultPage(pQueryAttr, page, pRow->offset, offset);
|
||||
char *in = getPosInResultPage_rv(page, pRow->offset, offset);
|
||||
memcpy(out, in, bytes * numOfRowsToCopy);
|
||||
|
||||
offset += bytes;
|
||||
}
|
||||
|
||||
numOfResult += numOfRowsToCopy;
|
||||
if (numOfResult == pRuntimeEnv->resultInfo.capacity) { // output buffer is full
|
||||
if (numOfResult == rowCapacity) { // output buffer is full
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -4029,9 +4023,8 @@ static void toSDatablock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntime
|
|||
return;
|
||||
}
|
||||
|
||||
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
int32_t orderType = TSDB_ORDER_ASC;//(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
|
||||
doCopyToSDataBlock(pRuntimeEnv, pGroupResInfo, orderType, pBlock);
|
||||
doCopyToSDataBlock(NULL, pGroupResInfo, orderType, pBlock, 0);
|
||||
|
||||
// refactor : extract method
|
||||
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
|
@ -5359,7 +5352,7 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
|
|||
return pResBlock;
|
||||
}
|
||||
|
||||
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) {
|
||||
assert(repeatTime > 0);
|
||||
|
||||
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
||||
|
@ -5391,7 +5384,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order,
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) {
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) {
|
||||
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
||||
|
||||
pInfo->pTsdbReadHandle = pTsdbReadHandle;
|
||||
|
@ -6557,7 +6550,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
|
|||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
// copyToSDataBlock(NULL, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
@ -6597,7 +6590,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
|
|||
doCloseAllTimeWindow(pRuntimeEnv);
|
||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
|
||||
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
@ -6615,7 +6608,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
|
|||
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
@ -6653,7 +6646,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
|
|||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
@ -7058,24 +7051,31 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
|||
|
||||
static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows) {
|
||||
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows);
|
||||
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
|
||||
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize);
|
||||
|
||||
pInfo->pResultRowHashTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
pInfo->pResultRowListSet = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
pInfo->keyBuf = malloc(1024 + sizeof(int64_t) + POINTER_BYTES); // TODO:
|
||||
pInfo->pool = initResultRowPool(getResultRowSize(pExprInfo));
|
||||
pInfo->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
|
||||
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
|
||||
// TODO: number of tables
|
||||
pInfo->pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
|
||||
|
||||
STimeWindow win = {0, INT64_MAX};
|
||||
createTableQueryInfo(pInfo->pTableQueryInfo, false, win);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SExprInfo* exprArrayDup(SArray* pExprInfo) {
|
||||
size_t numOfOutput = taosArrayGetSize(pExprInfo);
|
||||
SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo));
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
|
||||
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
|
||||
assignExprInfo(&p[i], pExpr);
|
||||
}
|
||||
size_t numOfOutput = taosArrayGetSize(pExprInfo);
|
||||
SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo));
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
|
||||
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
|
||||
assignExprInfo(&p[i], pExpr);
|
||||
}
|
||||
|
||||
return p;
|
||||
}
|
||||
|
||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
|
||||
|
@ -7085,9 +7085,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
|
|||
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
|
||||
|
||||
initAggInfo(pInfo, pExprInfo, numOfRows);
|
||||
|
||||
pInfo->seed = rand();
|
||||
setDefaultOutputBuf_rv(pInfo, pInfo->seed, MAIN_SCAN, pTaskInfo);
|
||||
setDefaultOutputBuf_rv(pInfo, MAIN_SCAN, pTaskInfo);
|
||||
size_t numOfOutput = taosArrayGetSize(pExprInfo);
|
||||
|
||||
SExprInfo* p = exprArrayDup(pExprInfo);
|
||||
|
@ -8055,7 +8053,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
|
|||
|
||||
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhyNode*) pPhyNode, pHandle, (uint64_t) queryId, taskId);
|
||||
|
||||
return createDataBlocksOptScanInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
|
||||
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
|
||||
} else if (pPhyNode->info.type == OP_Exchange) {
|
||||
SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode;
|
||||
return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
|
||||
|
|
Loading…
Reference in New Issue