|
|
|
@ -7083,31 +7083,32 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
|
|
|
|
|
return pInfo->binfo.pRes;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, STaskRuntimeEnv* pRuntimeEnv, bool* newgroup) {
|
|
|
|
|
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SResultInfo* pResultInfo, bool* newgroup, SExecTaskInfo* pTaskInfo) {
|
|
|
|
|
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
|
|
|
|
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
|
|
|
|
|
|
|
|
|
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)? pTaskInfo->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
|
|
|
|
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
|
|
|
|
|
|
|
|
|
|
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
|
|
|
|
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
|
|
|
|
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
|
|
|
|
|
|
|
|
|
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
|
|
|
|
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity, pInfo->p);
|
|
|
|
|
pInfo->existNewGroupBlock = NULL;
|
|
|
|
|
*newgroup = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRuntimeEnv *pRuntimeEnv, bool *newgroup) {
|
|
|
|
|
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SResultInfo *pResultInfo, bool *newgroup) {
|
|
|
|
|
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
|
|
|
|
*newgroup = false;
|
|
|
|
|
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
|
|
|
|
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult)) {
|
|
|
|
|
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity, pInfo->p);
|
|
|
|
|
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// handle the cached new group data block
|
|
|
|
|
if (pInfo->existNewGroupBlock) {
|
|
|
|
|
doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup);
|
|
|
|
|
// doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -7120,10 +7121,10 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
|
|
|
|
|
// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
|
|
|
|
|
// return pInfo->pRes;
|
|
|
|
|
// }
|
|
|
|
|
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup);
|
|
|
|
|
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
|
|
|
|
|
return pInfo->pRes;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
|
|
|
|
|
while(1) {
|
|
|
|
@ -7827,7 +7828,33 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, bool multigroupResult, SExecTaskInfo* pTaskInfo) {
|
|
|
|
|
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal,
|
|
|
|
|
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
|
|
|
|
|
struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal);
|
|
|
|
|
|
|
|
|
|
TSKEY sk = TMIN(win.skey, win.ekey);
|
|
|
|
|
TSKEY ek = TMAX(win.skey, win.ekey);
|
|
|
|
|
|
|
|
|
|
// TODO set correct time precision
|
|
|
|
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
|
|
|
|
getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, sk, ek, &w);
|
|
|
|
|
|
|
|
|
|
int32_t order = TSDB_ORDER_ASC;
|
|
|
|
|
pInfo->pFillInfo =
|
|
|
|
|
taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval->sliding,
|
|
|
|
|
pInterval->slidingUnit, (int8_t)pInterval->precision, fillType, pColInfo, id);
|
|
|
|
|
|
|
|
|
|
pInfo->p = calloc(numOfCols, POINTER_BYTES);
|
|
|
|
|
|
|
|
|
|
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
|
|
|
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
} else {
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
|
|
|
|
|
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo) {
|
|
|
|
|
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
|
|
|
|
|
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
|
|
|
|
|
|
|
|
@ -7836,23 +7863,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
|
|
|
|
pInfo->intervalInfo = *pInterval;
|
|
|
|
|
|
|
|
|
|
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
|
|
|
|
{
|
|
|
|
|
// struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pQueryAttr->fillVal);
|
|
|
|
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
|
|
|
|
|
|
|
|
|
TSKEY sk = TMIN(pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
|
|
|
|
TSKEY ek = TMAX(pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
|
|
|
|
getAlignQueryTimeWindow(pInterval, pInterval->precision, pTaskInfo->window.skey, sk, ek, &w);
|
|
|
|
|
|
|
|
|
|
int32_t order = TSDB_ORDER_ASC;
|
|
|
|
|
|
|
|
|
|
// pInfo->pFillInfo =
|
|
|
|
|
// taosCreateFillInfo(order, w.skey, 0, (int32_t)pResultInfo->capacity, numOfCols,
|
|
|
|
|
// pInterval->sliding, pInterval->slidingUnit,
|
|
|
|
|
// (int8_t)pInterval->precision, pQueryAttr->fillType, pColInfo, pTaskInfo->id.str);
|
|
|
|
|
|
|
|
|
|
pInfo->p = calloc(numOfCols, POINTER_BYTES);
|
|
|
|
|
}
|
|
|
|
|
// int32_t code = initFillInfo(pInfo, pExpr, numOfCols, fillVal, , pResultInfo->capacity, pTaskInfo->id.str, pInterval, fillType);
|
|
|
|
|
// if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
// goto _error;
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
pOperator->name = "FillOperator";
|
|
|
|
|
pOperator->blockingOptr = false;
|
|
|
|
@ -7869,6 +7883,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
|
|
|
|
|
|
|
|
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
|
|
|
|
return pOperator;
|
|
|
|
|
|
|
|
|
|
_error:
|
|
|
|
|
tfree(pOperator);
|
|
|
|
|
tfree(pInfo);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
|
|
|
|
@ -8377,7 +8396,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
|
|
|
|
|
return pTaskInfo;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId);
|
|
|
|
|
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableGroupInfo *pTableGroupInfo, uint64_t queryId, uint64_t taskId);
|
|
|
|
|
|
|
|
|
|
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId);
|
|
|
|
|
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
|
|
|
@ -8385,16 +8404,12 @@ static SArray* extractScanColumnId(SNodeList* pNodeList);
|
|
|
|
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
|
|
|
|
// if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node
|
|
|
|
|
// pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0);
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
|
|
|
|
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
|
|
|
|
|
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
|
|
|
|
|
|
|
|
|
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
|
|
|
|
|
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId);
|
|
|
|
|
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
|
|
|
|
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
|
|
|
|
|
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
|
|
|
|
|
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
|
|
|
@ -8403,10 +8418,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
|
|
|
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) {
|
|
|
|
|
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
|
|
|
|
|
|
|
|
|
STableGroupInfo groupInfo = {0};
|
|
|
|
|
|
|
|
|
|
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
|
|
|
|
|
SArray* tableIdList = extractTableIdList(&groupInfo);
|
|
|
|
|
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
|
|
|
|
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
|
|
|
|
|
|
|
|
|
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
|
|
|
|
|
SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols);
|
|
|
|
@ -8599,22 +8612,20 @@ SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) {
|
|
|
|
|
return tableIdList;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) {
|
|
|
|
|
STableGroupInfo groupInfo = {0};
|
|
|
|
|
|
|
|
|
|
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableGroupInfo *pTableGroupInfo, uint64_t queryId, uint64_t taskId) {
|
|
|
|
|
uint64_t uid = pTableScanNode->scan.uid;
|
|
|
|
|
int32_t code = doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, &groupInfo, queryId, taskId);
|
|
|
|
|
int32_t code = doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
goto _error;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (groupInfo.numOfTables == 0) {
|
|
|
|
|
if (pTableGroupInfo->numOfTables == 0) {
|
|
|
|
|
code = 0;
|
|
|
|
|
qDebug("no table qualified for query, TID:0x%"PRIx64", QID:0x%"PRIx64, taskId, queryId);
|
|
|
|
|
goto _error;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return createDataReaderImpl(pTableScanNode, &groupInfo, pHandle->reader, queryId, taskId);
|
|
|
|
|
return createDataReaderImpl(pTableScanNode, pTableGroupInfo, pHandle->reader, queryId, taskId);
|
|
|
|
|
|
|
|
|
|
_error:
|
|
|
|
|
terrno = code;
|
|
|
|
|