[td-13039] update test.
This commit is contained in:
parent
a2d7ce3045
commit
503300a2bd
|
@ -327,7 +327,7 @@ bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
|||
|
||||
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
|
||||
struct SFillColInfo* pFillCol, void* handle);
|
||||
struct SFillColInfo* pFillCol, const char* id);
|
||||
|
||||
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
|
||||
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
|
||||
|
|
|
@ -659,7 +659,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
||||
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
|
||||
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
||||
|
|
|
@ -7083,31 +7083,32 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
|
|||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, STaskRuntimeEnv* pRuntimeEnv, bool* newgroup) {
|
||||
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SResultInfo* pResultInfo, bool* newgroup, SExecTaskInfo* pTaskInfo) {
|
||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
||||
|
||||
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)? pTaskInfo->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
||||
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity, pInfo->p);
|
||||
pInfo->existNewGroupBlock = NULL;
|
||||
*newgroup = true;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRuntimeEnv *pRuntimeEnv, bool *newgroup) {
|
||||
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SResultInfo *pResultInfo, bool *newgroup) {
|
||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
*newgroup = false;
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p);
|
||||
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult)) {
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity, pInfo->p);
|
||||
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// handle the cached new group data block
|
||||
if (pInfo->existNewGroupBlock) {
|
||||
doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup);
|
||||
// doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7120,10 +7121,10 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
|
||||
// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
|
||||
// return pInfo->pRes;
|
||||
// }
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup);
|
||||
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
|
||||
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
|
||||
while(1) {
|
||||
|
@ -7827,7 +7828,33 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, bool multigroupResult, SExecTaskInfo* pTaskInfo) {
|
||||
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal,
|
||||
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
|
||||
struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal);
|
||||
|
||||
TSKEY sk = TMIN(win.skey, win.ekey);
|
||||
TSKEY ek = TMAX(win.skey, win.ekey);
|
||||
|
||||
// TODO set correct time precision
|
||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||
getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, sk, ek, &w);
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
pInfo->pFillInfo =
|
||||
taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval->sliding,
|
||||
pInterval->slidingUnit, (int8_t)pInterval->precision, fillType, pColInfo, id);
|
||||
|
||||
pInfo->p = calloc(numOfCols, POINTER_BYTES);
|
||||
|
||||
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
|
||||
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo) {
|
||||
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
|
@ -7836,23 +7863,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
|||
pInfo->intervalInfo = *pInterval;
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
{
|
||||
// struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pQueryAttr->fillVal);
|
||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||
|
||||
TSKEY sk = TMIN(pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
||||
TSKEY ek = TMAX(pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
||||
getAlignQueryTimeWindow(pInterval, pInterval->precision, pTaskInfo->window.skey, sk, ek, &w);
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
|
||||
// pInfo->pFillInfo =
|
||||
// taosCreateFillInfo(order, w.skey, 0, (int32_t)pResultInfo->capacity, numOfCols,
|
||||
// pInterval->sliding, pInterval->slidingUnit,
|
||||
// (int8_t)pInterval->precision, pQueryAttr->fillType, pColInfo, pTaskInfo->id.str);
|
||||
|
||||
pInfo->p = calloc(numOfCols, POINTER_BYTES);
|
||||
}
|
||||
// int32_t code = initFillInfo(pInfo, pExpr, numOfCols, fillVal, , pResultInfo->capacity, pTaskInfo->id.str, pInterval, fillType);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// goto _error;
|
||||
// }
|
||||
|
||||
pOperator->name = "FillOperator";
|
||||
pOperator->blockingOptr = false;
|
||||
|
@ -7869,6 +7883,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
|||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
tfree(pOperator);
|
||||
tfree(pInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
|
||||
|
@ -8377,7 +8396,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
|
|||
return pTaskInfo;
|
||||
}
|
||||
|
||||
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId);
|
||||
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableGroupInfo *pTableGroupInfo, uint64_t queryId, uint64_t taskId);
|
||||
|
||||
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId);
|
||||
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
||||
|
@ -8385,16 +8404,12 @@ static SArray* extractScanColumnId(SNodeList* pNodeList);
|
|||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||
|
||||
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
||||
// if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node
|
||||
// pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
// }
|
||||
|
||||
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
|
||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
||||
|
||||
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
|
||||
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId);
|
||||
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
|
||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||
|
@ -8403,10 +8418,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) {
|
||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||
|
||||
STableGroupInfo groupInfo = {0};
|
||||
|
||||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
|
||||
SArray* tableIdList = extractTableIdList(&groupInfo);
|
||||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||
SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols);
|
||||
|
@ -8599,22 +8612,20 @@ SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) {
|
|||
return tableIdList;
|
||||
}
|
||||
|
||||
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) {
|
||||
STableGroupInfo groupInfo = {0};
|
||||
|
||||
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableGroupInfo *pTableGroupInfo, uint64_t queryId, uint64_t taskId) {
|
||||
uint64_t uid = pTableScanNode->scan.uid;
|
||||
int32_t code = doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, &groupInfo, queryId, taskId);
|
||||
int32_t code = doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (groupInfo.numOfTables == 0) {
|
||||
if (pTableGroupInfo->numOfTables == 0) {
|
||||
code = 0;
|
||||
qDebug("no table qualified for query, TID:0x%"PRIx64", QID:0x%"PRIx64, taskId, queryId);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
return createDataReaderImpl(pTableScanNode, &groupInfo, pHandle->reader, queryId, taskId);
|
||||
return createDataReaderImpl(pTableScanNode, pTableGroupInfo, pHandle->reader, queryId, taskId);
|
||||
|
||||
_error:
|
||||
terrno = code;
|
||||
|
|
|
@ -61,7 +61,7 @@ typedef struct SFillInfo {
|
|||
|
||||
SFillColInfo* pFillCol; // column info for fill operations
|
||||
SFillTagColInfo* pTags; // tags value for filling gap
|
||||
void* handle; // for debug purpose
|
||||
const char* id;
|
||||
} SFillInfo;
|
||||
|
||||
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
|
||||
|
|
|
@ -342,7 +342,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
|||
|
||||
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
|
||||
struct SFillColInfo* pCol, void* handle) {
|
||||
struct SFillColInfo* pCol, const char* id) {
|
||||
if (fillType == TSDB_FILL_NONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -357,7 +357,7 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
|
|||
pFillInfo->numOfCols = numOfCols;
|
||||
pFillInfo->precision = precision;
|
||||
pFillInfo->alloc = capacity;
|
||||
pFillInfo->handle = handle;
|
||||
pFillInfo->id = id;
|
||||
|
||||
pFillInfo->interval.interval = slidingTime;
|
||||
pFillInfo->interval.intervalUnit = slidingUnit;
|
||||
|
|
Loading…
Reference in New Issue