[td-2895] refactor.

This commit is contained in:
Haojun Liao 2021-02-27 15:51:24 +08:00
parent 5746730a2d
commit 18112c74cb
3 changed files with 155 additions and 183 deletions

View File

@ -42,6 +42,8 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows)
enum { enum {
// when query starts to execute, this status will set // when query starts to execute, this status will set
QUERY_NOT_COMPLETED = 0x1u, QUERY_NOT_COMPLETED = 0x1u,
@ -281,8 +283,7 @@ enum {
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
uint8_t operatorType; uint8_t operatorType;
bool blockingOptr; // block operator or not bool blockingOptr; // block operator or not
uint8_t completed; // denote if current operator is completed uint8_t status; // denote if current operator is completed
uint32_t seed; // operator seed
int32_t numOfOutput; // number of columns of the current operator results int32_t numOfOutput; // number of columns of the current operator results
char *name; // name, used to show the query execution plan char *name; // name, used to show the query execution plan
void *info; // extension attribution void *info; // extension attribution
@ -306,7 +307,6 @@ typedef struct SQInfo {
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
SQuery query; SQuery query;
SHashObj* arrTableIdInfo; SHashObj* arrTableIdInfo;
/* /*
@ -363,13 +363,14 @@ typedef struct STableScanInfo {
SExprInfo *pExpr; SExprInfo *pExpr;
int32_t numOfOutput; int32_t numOfOutput;
int64_t elapsedTime; int64_t elapsedTime;
} STableScanInfo; } STableScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
SColumnInfo* pCols; SColumnInfo* pCols;
SSDataBlock* pRes; SSDataBlock* pRes;
int32_t totalTables;
int32_t currentIndex;
} STagScanInfo; } STagScanInfo;
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
@ -379,12 +380,17 @@ typedef struct SOptrBasicInfo {
SSDataBlock *pRes; SSDataBlock *pRes;
} SOptrBasicInfo; } SOptrBasicInfo;
typedef struct SOptrBasicInfo SAggOperatorInfo; typedef struct SOptrBasicInfo STableIntervalOperatorInfo;
typedef struct SOptrBasicInfo SHashIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
uint32_t seed;
} SAggOperatorInfo;
typedef struct SArithOperatorInfo { typedef struct SArithOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
int32_t bufCapacity; int32_t bufCapacity;
uint32_t seed;
} SArithOperatorInfo; } SArithOperatorInfo;
typedef struct SLimitOperatorInfo { typedef struct SLimitOperatorInfo {
@ -401,10 +407,10 @@ typedef struct SFillOperatorInfo {
int64_t totalInputRows; int64_t totalInputRows;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SHashGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
int32_t colIndex; int32_t colIndex;
} SHashGroupbyOperatorInfo; } SGroupbyOperatorInfo;
void freeParam(SQueryParam *param); void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);

View File

@ -1198,7 +1198,7 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
} }
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
SHashIntervalOperatorInfo* pInfo, SSDataBlock* pSDataBlock, int32_t groupId) { STableIntervalOperatorInfo* pInfo, SSDataBlock* pSDataBlock, int32_t groupId) {
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
int32_t numOfOutput = pOperatorInfo->numOfOutput; int32_t numOfOutput = pOperatorInfo->numOfOutput;
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
@ -1250,8 +1250,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1; // int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1;
// doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], // doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0],
// -1, tsCols[0], p, // -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP);
// w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
@ -1298,7 +1297,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
} }
static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SHashGroupbyOperatorInfo *pInfo, static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo,
SSDataBlock *pSDataBlock) { SSDataBlock *pSDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
@ -1331,7 +1330,6 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat
} }
#if 0 #if 0
/** /**
* todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions
* @param pRuntimeEnv * @param pRuntimeEnv
@ -4214,7 +4212,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
} }
void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SAggOperatorInfo *pInfo, int32_t numOfOutput, void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int32_t numOfOutput,
int32_t groupIndex, TSKEY nextKey) { int32_t groupIndex, TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
@ -5460,6 +5458,7 @@ static UNUSED_FUNC bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t ind
return true; return true;
} }
#endif #endif
STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win) { STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win) {
STsdbQueryCond cond = { STsdbQueryCond cond = {
.colList = pQuery->colList, .colList = pQuery->colList,
@ -6177,7 +6176,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqScanTableOp"; pOperator->name = "SeqScanTableOp";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->completed = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols;
pOperator->exec = doTableScan; pOperator->exec = doTableScan;
@ -6195,25 +6194,25 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
if ((strcasecmp(name, "TableAggregate") == 0) || (strcasecmp(name, "STableAggregate") == 0)) { if ((strcasecmp(name, "TableAggregate") == 0) || (strcasecmp(name, "STableAggregate") == 0)) {
SAggOperatorInfo* pAggInfo = pDownstream->info; SAggOperatorInfo* pAggInfo = pDownstream->info;
pTableScanInfo->pCtx = pAggInfo->pCtx; pTableScanInfo->pCtx = pAggInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo; pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pAggInfo->rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
} else if (strcasecmp(name, "HashIntervalAgg") == 0) { } else if (strcasecmp(name, "HashIntervalAgg") == 0) {
SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->info; STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info;
pTableScanInfo->pCtx = pIntervalInfo->pCtx; pTableScanInfo->pCtx = pIntervalInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo; pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "HashGroupbyAgg") == 0) { } else if (strcasecmp(name, "HashGroupbyAgg") == 0) {
SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info; SGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info;
pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx; pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo; pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset;
} else if (strcasecmp(name, "STableIntervalAggOp") == 0) { } else if (strcasecmp(name, "STableIntervalAggOp") == 0) {
SHashIntervalOperatorInfo *pInfo = pDownstream->info; STableIntervalOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->pCtx; pTableScanInfo->pCtx = pInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
@ -6259,11 +6258,13 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
// this is a blocking operator // this is a blocking operator
static SSDataBlock* doAggregate(void* param) { static SSDataBlock* doAggregate(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
SAggOperatorInfo* pAggInfo = pOperator->info; SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
@ -6277,7 +6278,7 @@ static SSDataBlock* doAggregate(void* param) {
break; break;
} }
setTagVal_rv(pOperator, pQuery->current->pTable, pAggInfo->pCtx, pOperator->numOfOutput); setTagVal_rv(pOperator, pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// TODO opt perf // TODO opt perf
if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) { if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) {
@ -6286,36 +6287,38 @@ static SSDataBlock* doAggregate(void* param) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pAggInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
doAggregateImpl(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); doAggregateImpl(pOperator, pQuery->window.skey, pInfo->pCtx, pBlock);
} }
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult_rv(pOperator, pAggInfo->pCtx, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset); finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
pAggInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput); pInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
return pAggInfo->pRes; return pInfo->pRes;
} }
static SSDataBlock* doSTableAggregate(void* param) { static SSDataBlock* doSTableAggregate(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
SAggOperatorInfo* pAggInfo = pOperator->info; SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->completed == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pAggInfo->pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
if (pAggInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pAggInfo->pRes; return pInfo->pRes;
} }
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
@ -6329,7 +6332,7 @@ static SSDataBlock* doSTableAggregate(void* param) {
break; break;
} }
setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pAggInfo->pCtx, pOperator->numOfOutput); setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// TODO opt perf // TODO opt perf
if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) { if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) {
@ -6338,25 +6341,26 @@ static SSDataBlock* doSTableAggregate(void* param) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pAggInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
TSKEY k = (pQuery->order.order == TSDB_ORDER_ASC)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1; TSKEY k = (pQuery->order.order == TSDB_ORDER_ASC)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1;
setExecutionContext_rv(pRuntimeEnv, pAggInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k); setExecutionContext_rv(pRuntimeEnv, pInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k);
doAggregateImpl(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); doAggregateImpl(pOperator, pQuery->window.skey, pInfo->pCtx, pBlock);
} }
pOperator->completed = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pAggInfo->resultRowInfo); closeAllResultRows(&pInfo->resultRowInfo);
updateWindowResNumOfRes_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset); updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo,
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pAggInfo->resultRowInfo, 0); pInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pAggInfo->pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
if (pAggInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pAggInfo->pRes; return pInfo->pRes;
} }
static SSDataBlock* doArithmeticOperation(void* param) { static SSDataBlock* doArithmeticOperation(void* param) {
@ -6398,7 +6402,7 @@ static SSDataBlock* doArithmeticOperation(void* param) {
static SSDataBlock* doLimit(void* param) { static SSDataBlock* doLimit(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
@ -6407,7 +6411,7 @@ static SSDataBlock* doLimit(void* param) {
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED);
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
@ -6417,7 +6421,7 @@ static SSDataBlock* doLimit(void* param) {
pInfo->total = pInfo->limit; pInfo->total = pInfo->limit;
setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED);
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} else { } else {
pInfo->total += pBlock->info.rows; pInfo->total += pBlock->info.rows;
} }
@ -6428,7 +6432,7 @@ static SSDataBlock* doLimit(void* param) {
// TODO add log // TODO add log
static SSDataBlock* doOffset(void* param) { static SSDataBlock* doOffset(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo *)param; SOperatorInfo *pOperator = (SOperatorInfo *)param;
if (pOperator->completed == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
@ -6438,7 +6442,7 @@ static SSDataBlock* doOffset(void* param) {
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
@ -6465,18 +6469,18 @@ static SSDataBlock* doOffset(void* param) {
static SSDataBlock* doHashIntervalAgg(void* param) { static SSDataBlock* doHashIntervalAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
SHashIntervalOperatorInfo* pIntervalInfo = pOperator->info; STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->completed == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pIntervalInfo->pRes; return pIntervalInfo->pRes;
@ -6503,7 +6507,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
pQuery->order.order = order; pQuery->order.order = order;
pQuery->window = win; pQuery->window = win;
pOperator->completed = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pIntervalInfo->resultRowInfo); closeAllResultRows(&pIntervalInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
@ -6512,7 +6516,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes;
@ -6520,17 +6524,17 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
static SSDataBlock* doSTableIntervalAgg(void* param) { static SSDataBlock* doSTableIntervalAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
SHashIntervalOperatorInfo* pIntervalInfo = pOperator->info; STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->completed == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pIntervalInfo->pRes; return pIntervalInfo->pRes;
@ -6557,14 +6561,14 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pIntervalInfo, pBlock, pTableQueryInfo->groupIndex); hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pIntervalInfo, pBlock, pTableQueryInfo->groupIndex);
} }
pOperator->completed = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
pQuery->order.order = order; // TODO : restore the order pQuery->order.order = order; // TODO : restore the order
doCloseAllTimeWindow(pRuntimeEnv); doCloseAllTimeWindow(pRuntimeEnv);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pIntervalInfo->pRes; return pIntervalInfo->pRes;
@ -6572,18 +6576,18 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
static SSDataBlock* doHashGroupbyAgg(void* param) { static SSDataBlock* doHashGroupbyAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
SHashGroupbyOperatorInfo *pInfo = pOperator->info; SGroupbyOperatorInfo *pInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->completed == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
@ -6607,7 +6611,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
hashGroupbyAgg(pRuntimeEnv, pOperator, pInfo, pBlock); hashGroupbyAgg(pRuntimeEnv, pOperator, pInfo, pBlock);
} }
pOperator->completed = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo); closeAllResultRows(&pInfo->binfo.resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
@ -6621,7 +6625,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
@ -6629,7 +6633,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
static SSDataBlock* doFill(void* param) { static SSDataBlock* doFill(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
@ -6645,7 +6649,7 @@ static SSDataBlock* doFill(void* param) {
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) { if (pBlock == NULL) {
if (pInfo->totalInputRows == 0) { if (pInfo->totalInputRows == 0) {
pOperator->completed = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
@ -6693,24 +6697,24 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, 1); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, 1);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
int64_t seed = rand(); pInfo->seed = rand();
setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset, seed); setDefaultOutputBuf(pRuntimeEnv, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.pRes,
pInfo->binfo.rowCellInfoOffset, pInfo->seed);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->completed = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->seed = seed; // TODO refactor: seed to move to pInfo??
pOperator->exec = doAggregate; pOperator->exec = doAggregate;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyBasicOperatorInfo;
@ -6738,7 +6742,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
} }
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
SHashGroupbyOperatorInfo* pInfo = (SHashGroupbyOperatorInfo*) param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
} }
@ -6750,14 +6754,14 @@ static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) {
SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, 1); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, 1);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "STableAggregate"; pOperator->name = "STableAggregate";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->completed = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
@ -6773,26 +6777,26 @@ SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo)); SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo));
int64_t seed = rand(); pInfo->seed = rand();
pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity; pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
setDefaultOutputBuf(pRuntimeEnv, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->pRes, pBInfo->rowCellInfoOffset, seed); setDefaultOutputBuf(pRuntimeEnv, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->pRes, pBInfo->rowCellInfoOffset,
pInfo->seed);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ArithmeticOp"; pOperator->name = "ArithmeticOp";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->completed = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->seed = seed;
pOperator->exec = doArithmeticOperation; pOperator->exec = doArithmeticOperation;
pOperator->cleanup = destroyArithOperatorInfo; pOperator->cleanup = destroyArithOperatorInfo;
@ -6808,7 +6812,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->name = "LimitOp"; pOperator->name = "LimitOp";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->completed = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->exec = doLimit; pOperator->exec = doLimit;
pOperator->info = pInfo; pOperator->info = pInfo;
@ -6825,7 +6829,7 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
pOperator->name = "OffsetOp"; pOperator->name = "OffsetOp";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->completed = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->exec = doOffset; pOperator->exec = doOffset;
pOperator->info = pInfo; pOperator->info = pInfo;
@ -6835,7 +6839,7 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
} }
SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
@ -6845,7 +6849,7 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
pOperator->name = "HashIntervalAgg"; pOperator->name = "HashIntervalAgg";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->completed = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
@ -6859,7 +6863,7 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
} }
SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
@ -6868,7 +6872,7 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "STableIntervalAggOp"; pOperator->name = "STableIntervalAggOp";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->completed = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
@ -6882,7 +6886,7 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S
} }
SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SHashGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SHashGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
@ -6892,7 +6896,7 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "HashGroupbyAgg"; pOperator->name = "HashGroupbyAgg";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->completed = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
@ -6914,7 +6918,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->name = "FillOp"; pOperator->name = "FillOp";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->completed = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
@ -6929,35 +6933,32 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
static SSDataBlock* doTagScan(void* param) { static SSDataBlock* doTagScan(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
STagScanInfo *pTagScanInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity;
assert(numOfGroup == 0 || numOfGroup == 1);
if (numOfGroup == 0) {
return NULL;
}
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); STagScanInfo *pInfo = pOperator->info;
SSDataBlock *pRes = pInfo->pRes;
size_t num = taosArrayGetSize(pa);
assert(num == pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
int32_t count = 0; int32_t count = 0;
// int32_t functionId = pOperator->pExpr[0].base.functionId; SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
/*if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
int32_t functionId = pOperator->pExpr[0].base.functionId;
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
SQuery* pQuery = pRuntimeEnv->pQuery;
assert(pQuery->numOfOutput == 1); assert(pQuery->numOfOutput == 1);
SExprInfo* pExprInfo = &pOperator->pExpr[0]; SExprInfo* pExprInfo = &pOperator->pExpr[0];
int32_t rsize = pExprInfo->bytes; int32_t rsize = pExprInfo->bytes;
count = 0; count = 0;
int16_t bytes = pExprInfo->bytes; int16_t bytes = pExprInfo->bytes;
int16_t type = pExprInfo->type; int16_t type = pExprInfo->type;
for(int32_t i = 0; i < pQuery->numOfTags; ++i) { for(int32_t i = 0; i < pQuery->numOfTags; ++i) {
if (pQuery->tagColList[i].colId == pExprInfo->base.colInfo.colId) { if (pQuery->tagColList[i].colId == pExprInfo->base.colInfo.colId) {
@ -6967,11 +6968,13 @@ static SSDataBlock* doTagScan(void* param) {
} }
} }
while(pRuntimeEnv->tableIndex < num && count < pRuntimeEnv->resultInfo.capacity) { SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
int32_t i = pRuntimeEnv->tableIndex++;
while(pInfo->currentIndex < pInfo->totalTables && count < maxNumOfTables) {
int32_t i = pInfo->currentIndex++;
STableQueryInfo *item = taosArrayGetP(pa, i); STableQueryInfo *item = taosArrayGetP(pa, i);
char *output = pQuery->sdata[0]->data + count * rsize; char *output = pColInfo->pData + count * rsize;
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
output = varDataVal(output); output = varDataVal(output);
@ -6989,32 +6992,30 @@ static SSDataBlock* doTagScan(void* param) {
*(int32_t *)output = pQuery->vgId; *(int32_t *)output = pQuery->vgId;
output += sizeof(pQuery->vgId); output += sizeof(pQuery->vgId);
char* data = NULL;
if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
char* data = tsdbGetTableName(item->pTable); data = tsdbGetTableName(item->pTable);
memcpy(output, data, varDataTLen(data));
} else { } else {
char* data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes); data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes);
doSetTagValueToResultBuf(output, data, type, bytes);
} }
doSetTagValueToResultBuf(output, data, type, bytes);
count += 1; count += 1;
} }
qDebug("QInfo:%p create (tableId, tag) info completed, rows:%d", pRuntimeEnv->qinfo, count); qDebug("QInfo:%p create (tableId, tag) info completed, rows:%d", pRuntimeEnv->qinfo, count);
} /*else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query
} else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query
*(int64_t*) pQuery->sdata[0]->data = num; *(int64_t*) pQuery->sdata[0]->data = num;
count = 1; count = 1;
SET_STABLE_QUERY_OVER(pRuntimeEnv); SET_STABLE_QUERY_OVER(pRuntimeEnv);
qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pRuntimeEnv->qinfo, count); qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pRuntimeEnv->qinfo, count);
} else*/ { // return only the tags|table name etc. }*/ else { // return only the tags|table name etc.
count = 0; SExprInfo* pExprInfo = pOperator->pExpr; // todo use the column list instead of exprinfo
int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity;
SExprInfo* pExprInfo = pOperator->pExpr;
while(pRuntimeEnv->tableIndex < num && count < maxNumOfTables) { count = 0;
int32_t i = pRuntimeEnv->tableIndex++; while(pInfo->currentIndex < pInfo->totalTables && count < maxNumOfTables) {
int32_t i = pInfo->currentIndex++;
STableQueryInfo* item = taosArrayGetP(pa, i); STableQueryInfo* item = taosArrayGetP(pa, i);
@ -7026,7 +7027,7 @@ static SSDataBlock* doTagScan(void* param) {
continue; continue;
} }
SColumnInfoData* pColInfo = taosArrayGet(pTagScanInfo->pRes->pDataBlock, j); SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j);
type = pExprInfo[j].type; type = pExprInfo[j].type;
bytes = pExprInfo[j].bytes; bytes = pExprInfo[j].bytes;
@ -7043,21 +7044,27 @@ static SSDataBlock* doTagScan(void* param) {
count += 1; count += 1;
} }
pTagScanInfo->pRes->info.rows = count; pRes->info.rows = count;
qDebug("QInfo:%p create tag values results completed, rows:%d", pRuntimeEnv->qinfo, count); qDebug("QInfo:%p create tag values results completed, rows:%d", pRuntimeEnv->qinfo, count);
} }
return (pTagScanInfo->pRes->info.rows == 0)? NULL:pTagScanInfo->pRes; return (pRes->info.rows == 0)? NULL:pInfo->pRes;
} }
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
assert(numOfGroup == 0 || numOfGroup == 1);
pInfo->totalTables = pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
pInfo->currentIndex = 0;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqTagScanOp"; pOperator->name = "SeqTableTagScan";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->completed = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exec = doTagScan; pOperator->exec = doTagScan;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
@ -7088,57 +7095,18 @@ void tableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
#if 0
if (hasNotReturnedResults(pRuntimeEnv, &pRuntimeEnv->groupResInfo)) {
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
/*
* There are remain results that are not returned due to result interpolation
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
*/
// pRuntimeEnv->resultInfo.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata);
if (pRuntimeEnv->resultInfo.rows > 0) {
limitOperator(pQuery, pQInfo);
qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total);
} else {
return copyAndFillResult(pQInfo);
}
} else {
pRuntimeEnv->resultInfo.rows = 0;
assert(pRuntimeEnv->resultRowInfo.size > 0);
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
doSecondaryArithmeticProcess(pQuery);
if (pRuntimeEnv->resultInfo.rows > 0) {
limitOperator(pQuery, pQInfo);
}
if (pRuntimeEnv->resultInfo.rows > 0) {
qDebug("QInfo:%p %" PRId64 " rows returned from group results, total:%" PRId64 "", pQInfo, pRuntimeEnv->resultInfo.rows,
pRuntimeEnv->resultInfo.total);
} else {
qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pRuntimeEnv->resultInfo.total);
}
}
return;
}
#endif
// number of points returned during this query // number of points returned during this query
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1); assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1);
SArray* g = GET_TABLEGROUP(pRuntimeEnv, 0);
STableQueryInfo* item = taosArrayGetP(g, 0); SArray* g = GET_TABLEGROUP(pRuntimeEnv, 0);
pQuery->current = item; pQuery->current = taosArrayGetP(g, 0);
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
// record the total elapsed time // record the total elapsed time
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st); pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1);
} }
void buildTableBlockDistResult(SQInfo *pQInfo) { void buildTableBlockDistResult(SQInfo *pQInfo) {
@ -8478,7 +8446,6 @@ void buildTagQueryResult(SQInfo* pQInfo) {
} }
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
return;
#if 0 #if 0
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);

View File

@ -246,13 +246,12 @@ bool qTableQuery(qinfo_t qinfo) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo); qDebug("QInfo:%p query is killed", pQInfo);
} else if (pRuntimeEnv->outputBuf->info.rows == 0) { } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
pRuntimeEnv->resultInfo.total); pRuntimeEnv->resultInfo.total);
} else { } else {
qDebug("QInfo:%p query paused, %d rows returned, numOfTotal:%" PRId64 " rows", qDebug("QInfo:%p query paused, %d rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pRuntimeEnv->outputBuf->info.rows, pQInfo, GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total + GET_NUM_OF_RESULTS(pRuntimeEnv));
pRuntimeEnv->resultInfo.total + pRuntimeEnv->outputBuf->info.rows);
} }
return doBuildResCheck(pQInfo); return doBuildResCheck(pQInfo);
@ -289,7 +288,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
if (pQInfo->dataReady == QUERY_RESULT_READY) { if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true; *buildRes = true;
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo, pQuery->resultRowSize, qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo, pQuery->resultRowSize,
pRuntimeEnv->outputBuf->info.rows, tstrerror(pQInfo->code)); GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code));
} else { } else {
*buildRes = false; *buildRes = false;
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo); qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
@ -313,7 +312,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
int64_t s = pRuntimeEnv->outputBuf->info.rows; int64_t s = GET_NUM_OF_RESULTS(pRuntimeEnv);
size_t size = getResultSize(pQInfo, &s); size_t size = getResultSize(pQInfo, &s);
@ -339,7 +338,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
} }
(*pRsp)->precision = htons(pQuery->precision); (*pRsp)->precision = htons(pQuery->precision);
if (pQInfo->runtimeEnv.outputBuf->info.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
doDumpQueryResult(pQInfo, (*pRsp)->data); doDumpQueryResult(pQInfo, (*pRsp)->data);
} else { } else {
setQueryStatus(pQuery, QUERY_OVER); setQueryStatus(pQuery, QUERY_OVER);