diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 49a2fd5903..bf2937a220 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -89,7 +89,7 @@ enum { }; enum { - MASTER_SCAN = 0x0u, + MAIN_SCAN = 0x0u, REVERSE_SCAN = 0x1u, REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan MERGE_STAGE = 0x20u, diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 32bcb58bc0..ff87bc0085 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -64,6 +64,33 @@ enum { QUERY_OVER = 0x4u, }; +enum OPERATOR_TYPE_E { + OP_TableScan = 1, + OP_DataBlocksOptScan = 2, + OP_TableSeqScan = 3, + OP_TagScan = 4, + OP_TableBlockInfoScan= 5, + OP_Aggregate = 6, + OP_Project = 7, + OP_Groupby = 8, + OP_Limit = 9, + OP_SLimit = 10, + OP_TimeWindow = 11, + OP_SessionWindow = 12, + OP_Fill = 13, + OP_MultiTableAggregate = 14, + OP_MultiTableTimeInterval = 15, + OP_DummyInput = 16, //TODO remove it after fully refactor. + OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream. + OP_GlobalAggregate = 18, // global merge for the multi-way data sources. + OP_Filter = 19, + OP_Distinct = 20, + OP_Join = 21, + OP_StateWindow = 22, + OP_AllTimeWindow = 23, + OP_AllMultiTableTimeInterval = 24, + OP_Order = 25, +}; typedef struct SResultRowCell { uint64_t groupId; @@ -99,7 +126,7 @@ typedef struct STableQueryInfo { TSKEY lastKey; int32_t groupIndex; // group id in table list SVariant tag; - STimeWindow win; + STimeWindow win; // todo remove it later STSCursor cur; void* pTable; // for retrieve the page id list SResultRowInfo resInfo; @@ -127,31 +154,34 @@ typedef struct { int64_t sumRunTimes; } SOperatorProfResult; -typedef struct SQueryCostInfo { - uint64_t loadStatisTime; - uint64_t loadFileBlockTime; - uint64_t loadDataInCacheTime; - uint64_t loadStatisSize; - uint64_t loadFileBlockSize; - uint64_t loadDataInCacheSize; - - uint64_t loadDataTime; - uint64_t totalRows; - uint64_t totalCheckedRows; - uint32_t totalBlocks; - uint32_t loadBlocks; - uint32_t loadBlockStatis; - uint32_t discardBlocks; - uint64_t elapsedTime; - uint64_t firstStageMergeTime; - uint64_t winInfoSize; - uint64_t tableInfoSize; - uint64_t hashSize; - uint64_t numOfTimeWindows; +typedef struct STaskCostInfo { + int64_t start; + int64_t end; - SArray* queryProfEvents; //SArray - SHashObj* operatorProfResults; //map -} SQueryCostInfo; + uint64_t loadStatisTime; + uint64_t loadFileBlockTime; + uint64_t loadDataInCacheTime; + uint64_t loadStatisSize; + uint64_t loadFileBlockSize; + uint64_t loadDataInCacheSize; + + uint64_t loadDataTime; + uint64_t totalRows; + uint64_t totalCheckedRows; + uint32_t totalBlocks; + uint32_t loadBlocks; + uint32_t loadBlockStatis; + uint32_t discardBlocks; + uint64_t elapsedTime; + uint64_t firstStageMergeTime; + uint64_t winInfoSize; + uint64_t tableInfoSize; + uint64_t hashSize; + uint64_t numOfTimeWindows; + + SArray *queryProfEvents; //SArray + SHashObj *operatorProfResults; //map +} STaskCostInfo; typedef struct { int64_t vgroupLimit; @@ -235,9 +265,33 @@ typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); struct SOperatorInfo; +typedef struct STaskIdInfo { + uint64_t queryId; // this is also a request id + uint64_t subplanId; + uint64_t templateId; + uint64_t taskId; // this is a subplan id +} STaskIdInfo; + +typedef struct STaskInfo { + STaskIdInfo id; + char *content; + uint32_t status; + STimeWindow window; + STaskCostInfo cost; + int64_t owner; // if it is in execution + + STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure + pthread_mutex_t lock; // used to synchronize the rsp/query threads +// tsem_t ready; +// int32_t dataReady; // denote if query result is ready or not +// void* rspContext; // response context + char *sql; // query sql string + jmp_buf env; +} STaskInfo; + typedef struct STaskRuntimeEnv { jmp_buf env; - STaskAttr* pQueryAttr; + STaskAttr* pQueryAttr; uint32_t status; // query status void* qinfo; uint8_t scanFlag; // denotes reversed scan of data or not @@ -287,9 +341,10 @@ typedef struct SOperatorInfo { void *info; // extension attribution SExprInfo *pExpr; STaskRuntimeEnv *pRuntimeEnv; + STaskInfo *pTaskInfo; - struct SOperatorInfo ** pDownstream; // upstream pointer list - int32_t numOfUpstream; // number of upstream. The value is always ONE expect for join operator + struct SOperatorInfo **pDownstream; // downstram pointer list + int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator __operator_fn_t exec; __optr_cleanup_fn_t cleanup; } SOperatorInfo; @@ -321,7 +376,7 @@ typedef struct SQInfo { void* rspContext; // response context int64_t startExecTs; // start to exec timestamp char* sql; // query sql string - SQueryCostInfo summary; + STaskCostInfo summary; } SQInfo; typedef struct STaskParam { @@ -365,9 +420,12 @@ typedef struct STableScanInfo { SSDataBlock block; int32_t numOfOutput; int64_t elapsedTime; - int32_t tableIndex; - int32_t prevGroupId; // previous table group id + + int32_t prevGroupId; // previous table group id + + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + STimeWindow window; } STableScanInfo; typedef struct STagScanInfo { @@ -512,7 +570,7 @@ typedef struct SOrderOperatorInfo { void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); -SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime); +SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime); SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c new file mode 100644 index 0000000000..cfc958806b --- /dev/null +++ b/source/libs/executor/src/executorMain.c @@ -0,0 +1,579 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "tcache.h" +#include "tglobal.h" +#include "tmsg.h" +#include "exception.h" + +#include "thash.h" +#include "executorimpl.h" +#include "executor.h" +#include "tlosertree.h" +#include "ttypes.h" +#include "query.h" + +typedef struct STaskMgmt { + pthread_mutex_t lock; + SCacheObj *qinfoPool; // query handle pool + int32_t vgId; + bool closed; +} STaskMgmt; + +static void taskMgmtKillTaskFn(void* handle, void* param1) { + void** fp = (void**)handle; + qKillTask(*fp); +} + +static void freeqinfoFn(void *qhandle) { + void** handle = qhandle; + if (handle == NULL || *handle == NULL) { + return; + } + + qKillTask(*handle); + qDestroyTask(*handle); +} + +void freeParam(STaskParam *param) { + tfree(param->sql); + tfree(param->tagCond); + tfree(param->tbnameCond); + tfree(param->pTableIdList); + taosArrayDestroy(param->pOperator); + tfree(param->pExprs); + tfree(param->pSecExprs); + + tfree(param->pExpr); + tfree(param->pSecExpr); + + tfree(param->pGroupColIndex); + tfree(param->pTagColumnInfo); + tfree(param->pGroupbyExpr); + tfree(param->prevResult); +} + +// todo parse json to get the operator tree. + +int32_t qCreateTask(void* tsdb, int32_t vgId, void* pQueryMsg, qTaskInfo_t* pTaskInfo, uint64_t taskId) { + assert(pQueryMsg != NULL && tsdb != NULL); + + int32_t code = TSDB_CODE_SUCCESS; +#if 0 + STaskParam param = {0}; + code = convertQueryMsg(pQueryMsg, ¶m); + if (code != TSDB_CODE_SUCCESS) { + goto _over; + } + + if (pQueryMsg->numOfTables <= 0) { + qError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables); + code = TSDB_CODE_QRY_INVALID_MSG; + goto _over; + } + + if (param.pTableIdList == NULL || taosArrayGetSize(param.pTableIdList) == 0) { + qError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg); + code = TSDB_CODE_QRY_INVALID_MSG; + goto _over; + } + + SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->tableCols}; + if ((code = createQueryFunc(&info, pQueryMsg->numOfOutput, ¶m.pExprs, param.pExpr, param.pTagColumnInfo, + pQueryMsg->queryType, pQueryMsg, param.pUdfInfo)) != TSDB_CODE_SUCCESS) { + goto _over; + } + + if (param.pSecExpr != NULL) { + if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, ¶m.pSecExprs, param.pSecExpr, param.pExprs, param.pUdfInfo)) != TSDB_CODE_SUCCESS) { + goto _over; + } + } + + if (param.colCond != NULL) { + if ((code = createQueryFilter(param.colCond, pQueryMsg->colCondLen, ¶m.pFilters)) != TSDB_CODE_SUCCESS) { + goto _over; + } + } + + param.pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, param.pGroupColIndex, &code); + if ((param.pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) { + goto _over; + } + + bool isSTableQuery = false; + STableGroupInfo tableGroupInfo = {0}; + int64_t st = taosGetTimestampUs(); + + if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) { + STableIdInfo *id = taosArrayGet(param.pTableIdList, 0); + + qDebug("qmsg:%p query normal table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid); + if ((code = tsdbGetOneTableGroup(tsdb, id->uid, pQueryMsg->window.skey, &tableGroupInfo)) != TSDB_CODE_SUCCESS) { + goto _over; + } + } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) { + isSTableQuery = true; + + // also note there's possibility that only one table in the super table + if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) { + STableIdInfo *id = taosArrayGet(param.pTableIdList, 0); + + // group by normal column, do not pass the group by condition to tsdb to group table into different group + int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; + if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(param.pGroupColIndex->flag)) { + numOfGroupByCols = 0; + } + + qDebug("qmsg:%p query stable, uid:%"PRIu64", tid:%d", pQueryMsg, id->uid, id->tid); + code = tsdbQuerySTableByTagCond(tsdb, id->uid, pQueryMsg->window.skey, param.tagCond, pQueryMsg->tagCondLen, + pQueryMsg->tagNameRelType, param.tbnameCond, &tableGroupInfo, param.pGroupColIndex, numOfGroupByCols); + + if (code != TSDB_CODE_SUCCESS) { + qError("qmsg:%p failed to query stable, reason: %s", pQueryMsg, tstrerror(code)); + goto _over; + } + } else { + code = tsdbGetTableGroupFromIdList(tsdb, param.pTableIdList, &tableGroupInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _over; + } + + qDebug("qmsg:%p query on %u tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables); + } + + int64_t el = taosGetTimestampUs() - st; + qDebug("qmsg:%p tag filter completed, numOfTables:%u, elapsed time:%"PRId64"us", pQueryMsg, tableGroupInfo.numOfTables, el); + } else { + assert(0); + } + + code = checkForQueryBuf(tableGroupInfo.numOfTables); + if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort + goto _over; + } + + assert(pQueryMsg->stableQuery == isSTableQuery); + (*pTaskInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, + param.pTagColumnInfo, param.pFilters, vgId, param.sql, qId, param.pUdfInfo); + + param.sql = NULL; + param.pExprs = NULL; + param.pSecExprs = NULL; + param.pGroupbyExpr = NULL; + param.pTagColumnInfo = NULL; + param.pFilters = NULL; + + if ((*pTaskInfo) == NULL) { + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _over; + } + param.pUdfInfo = NULL; + + code = initQInfo(&pQueryMsg->tsBuf, tsdb, NULL, *pTaskInfo, ¶m, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL); + + _over: + if (param.pGroupbyExpr != NULL) { + taosArrayDestroy(param.pGroupbyExpr->columnInfo); + } + + tfree(param.colCond); + + destroyUdfInfo(param.pUdfInfo); + + taosArrayDestroy(param.pTableIdList); + param.pTableIdList = NULL; + + freeParam(¶m); + + for (int32_t i = 0; i < pQueryMsg->numOfCols; i++) { + SColumnInfo* column = pQueryMsg->tableCols + i; + freeColumnFilterInfo(column->flist.filterInfo, column->flist.numOfFilters); + } + + filterFreeInfo(param.pFilters); + + //pTaskInfo already freed in initQInfo, but *pTaskInfo may not pointer to null; + if (code != TSDB_CODE_SUCCESS) { + *pTaskInfo = NULL; + } +#endif + + // if failed to add ref for all tables in this query, abort current query + return code; +} + +#ifdef TEST_IMPL +// wait moment +int waitMoment(SQInfo* pQInfo){ + if(pQInfo->sql) { + int ms = 0; + char* pcnt = strstr(pQInfo->sql, " count(*)"); + if(pcnt) return 0; + + char* pos = strstr(pQInfo->sql, " t_"); + if(pos){ + pos += 3; + ms = atoi(pos); + while(*pos >= '0' && *pos <= '9'){ + pos ++; + } + char unit_char = *pos; + if(unit_char == 'h'){ + ms *= 3600*1000; + } else if(unit_char == 'm'){ + ms *= 60*1000; + } else if(unit_char == 's'){ + ms *= 1000; + } + } + if(ms == 0) return 0; + printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql); + + if(ms < 1000) { + taosMsleep(ms); + } else { + int used_ms = 0; + while(used_ms < ms) { + taosMsleep(1000); + used_ms += 1000; + if(isQueryKilled(pQInfo)){ + printf("test check query is canceled, sleep break.%s\n", pQInfo->sql); + break; + } + } + } + } + return 1; +} +#endif + +bool qExecTask(qTaskInfo_t qinfo, uint64_t *qId) { + SQInfo *pQInfo = (SQInfo *)qinfo; + assert(pQInfo && pQInfo->signature == pQInfo); + int64_t threadId = taosGetSelfPthreadId(); + + int64_t curOwner = 0; + if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) { + qError("QInfo:0x%"PRIx64"-%p qhandle is now executed by thread:%p", pQInfo->qId, pQInfo, (void*) curOwner); + pQInfo->code = TSDB_CODE_QRY_IN_EXEC; + return false; + } + + *qId = pQInfo->qId; + if(pQInfo->startExecTs == 0) + pQInfo->startExecTs = taosGetTimestampMs(); + + if (isQueryKilled(pQInfo)) { + qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId); + return doBuildResCheck(pQInfo); + } + + STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) { + qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", pQInfo->qId); + setTaskStatus(pRuntimeEnv, QUERY_COMPLETED); + return doBuildResCheck(pQInfo); + } + + // error occurs, record the error code and return to client + int32_t ret = setjmp(pQInfo->runtimeEnv.env); + if (ret != TSDB_CODE_SUCCESS) { + publishQueryAbortEvent(pQInfo, ret); + pQInfo->code = ret; + qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code)); + return doBuildResCheck(pQInfo); + } + + qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId); + + bool newgroup = false; + publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC); + + int64_t st = taosGetTimestampUs(); + pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup); + pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st); +#ifdef TEST_IMPL + waitMoment(pQInfo); +#endif + publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC); + pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv); + + if (isQueryKilled(pQInfo)) { + qDebug("QInfo:0x%"PRIx64" query is killed", pQInfo->qId); + } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) { + qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, + pRuntimeEnv->resultInfo.total); + } else { + qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pQInfo->qId, + GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total); + } + + return doBuildResCheck(pQInfo); +} + +int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) { + SQInfo *pQInfo = (SQInfo *)qinfo; + + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + qError("QInfo invalid qhandle"); + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + + *buildRes = false; + if (IS_QUERY_KILLED(pQInfo)) { + qDebug("QInfo:0x%"PRIx64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code); + return pQInfo->code; + } + + int32_t code = TSDB_CODE_SUCCESS; + + if (tsRetrieveBlockingModel) { + pQInfo->rspContext = pRspContext; + tsem_wait(&pQInfo->ready); + *buildRes = true; + code = pQInfo->code; + } else { + STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; + + pthread_mutex_lock(&pQInfo->lock); + + assert(pQInfo->rspContext == NULL); + if (pQInfo->dataReady == QUERY_RESULT_READY) { + *buildRes = true; + qDebug("QInfo:0x%"PRIx64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQueryAttr->resultRowSize, + GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code)); + } else { + *buildRes = false; + qDebug("QInfo:0x%"PRIx64" retrieve req set query return result after paused", pQInfo->qId); + pQInfo->rspContext = pRspContext; + assert(pQInfo->rspContext != NULL); + } + + code = pQInfo->code; + pthread_mutex_unlock(&pQInfo->lock); + } + + return code; +} + +void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) { + SQInfo* pQInfo = (SQInfo*) qinfo; + assert(pQInfo != NULL); + + return pQInfo->rspContext; +} + +int32_t qKillTask(qTaskInfo_t qinfo) { + SQInfo *pQInfo = (SQInfo *)qinfo; + + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + + qDebug("QInfo:0x%"PRIx64" query killed", pQInfo->qId); + setQueryKilled(pQInfo); + + // Wait for the query executing thread being stopped/ + // Once the query is stopped, the owner of qHandle will be cleared immediately. + while (pQInfo->owner != 0) { + taosMsleep(100); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { + SQInfo *pQInfo = (SQInfo *)qinfo; + + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + + return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQInfo->runtimeEnv.status, QUERY_OVER); +} + +void qDestroyTask(qTaskInfo_t qHandle) { + SQInfo* pQInfo = (SQInfo*) qHandle; + if (!isValidQInfo(pQInfo)) { + return; + } + + qDebug("QInfo:0x%"PRIx64" query completed", pQInfo->qId); + queryCostStatis(pQInfo); // print the query cost summary + doDestroyTask(pQInfo); +} + +void* qOpenTaskMgmt(int32_t vgId) { + const int32_t refreshHandleInterval = 30; // every 30 seconds, refresh handle pool + + char cacheName[128] = {0}; + sprintf(cacheName, "qhandle_%d", vgId); + + STaskMgmt* pTaskMgmt = calloc(1, sizeof(STaskMgmt)); + if (pTaskMgmt == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + pTaskMgmt->qinfoPool = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshHandleInterval, true, freeqinfoFn, cacheName); + pTaskMgmt->closed = false; + pTaskMgmt->vgId = vgId; + + pthread_mutex_init(&pTaskMgmt->lock, NULL); + + qDebug("vgId:%d, open queryTaskMgmt success", vgId); + return pTaskMgmt; +} + +void qTaskMgmtNotifyClosing(void* pQMgmt) { + if (pQMgmt == NULL) { + return; + } + + STaskMgmt* pQueryMgmt = pQMgmt; + qInfo("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId); + + pthread_mutex_lock(&pQueryMgmt->lock); + pQueryMgmt->closed = true; + pthread_mutex_unlock(&pQueryMgmt->lock); + + taosCacheRefresh(pQueryMgmt->qinfoPool, taskMgmtKillTaskFn, NULL); +} + +void qQueryMgmtReOpen(void *pQMgmt) { + if (pQMgmt == NULL) { + return; + } + + STaskMgmt *pQueryMgmt = pQMgmt; + qInfo("vgId:%d, set querymgmt reopen", pQueryMgmt->vgId); + + pthread_mutex_lock(&pQueryMgmt->lock); + pQueryMgmt->closed = false; + pthread_mutex_unlock(&pQueryMgmt->lock); +} + +void qCleanupTaskMgmt(void* pQMgmt) { + if (pQMgmt == NULL) { + return; + } + + STaskMgmt* pQueryMgmt = pQMgmt; + int32_t vgId = pQueryMgmt->vgId; + + assert(pQueryMgmt->closed); + + SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool; + pQueryMgmt->qinfoPool = NULL; + + taosCacheCleanup(pqinfoPool); + pthread_mutex_destroy(&pQueryMgmt->lock); + tfree(pQueryMgmt); + + qDebug("vgId:%d, queryMgmt cleanup completed", vgId); +} + +void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) { + if (pMgmt == NULL) { + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + return NULL; + } + + STaskMgmt *pQueryMgmt = pMgmt; + if (pQueryMgmt->qinfoPool == NULL) { + qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo); + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + return NULL; + } + + pthread_mutex_lock(&pQueryMgmt->lock); + if (pQueryMgmt->closed) { + pthread_mutex_unlock(&pQueryMgmt->lock); + qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo); + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + return NULL; + } else { + void** handle = taosCachePut(pQueryMgmt->qinfoPool, &qId, sizeof(qId), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), + (getMaximumIdleDurationSec()*1000)); + pthread_mutex_unlock(&pQueryMgmt->lock); + + return handle; + } +} + +void** qAcquireTask(void* pMgmt, uint64_t _key) { + STaskMgmt *pQueryMgmt = pMgmt; + + if (pQueryMgmt->closed) { + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + return NULL; + } + + if (pQueryMgmt->qinfoPool == NULL) { + terrno = TSDB_CODE_QRY_INVALID_QHANDLE; + return NULL; + } + + void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &_key, sizeof(_key)); + if (handle == NULL || *handle == NULL) { + terrno = TSDB_CODE_QRY_INVALID_QHANDLE; + return NULL; + } else { + return handle; + } +} + +void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle) { + STaskMgmt *pQueryMgmt = pMgmt; + if (pQueryMgmt->qinfoPool == NULL) { + return NULL; + } + + taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle); + return 0; +} + +#if 0 +//kill by qid +int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) { + int32_t error = TSDB_CODE_SUCCESS; + void** handle = qAcquireTask(pMgmt, qId); + if(handle == NULL) return terrno; + + SQInfo* pQInfo = (SQInfo*)(*handle); + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId); + setQueryKilled(pQInfo); + + // wait query stop + int32_t loop = 0; + while (pQInfo->owner != 0) { + taosMsleep(waitMs); + if(loop++ > waitCount){ + error = TSDB_CODE_FAILED; + break; + } + } + + qReleaseTask(pMgmt, (void **)&handle, true); + return error; +} + +#endif \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a22f56753b..1d2740f0e0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -18,18 +18,19 @@ #include "ttime.h" #include "exception.h" -#include "function.h" +#include "../../../../contrib/cJson/cJSON.h" #include "executorimpl.h" -#include "thash.h" #include "function.h" #include "tcompare.h" #include "tcompression.h" +#include "thash.h" #include "ttypes.h" +#include "query.h" -#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN) +#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) #define IS_REPEAT_SCAN(runtime) ((runtime)->scanFlag == REPEAT_SCAN) -#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) +#define SET_MAIN_SCAN_FLAG(runtime) ((runtime)->scanFlag = MAIN_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey)) @@ -203,10 +204,12 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); static void destroyOperatorInfo(SOperatorInfo* pOperator); +void setTaskStatus(STaskInfo *pTaskInfo, int8_t status); + static void doSetOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; - if (pOperator->pRuntimeEnv != NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + if (pOperator->pTaskInfo != NULL) { + setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); } } @@ -1332,7 +1335,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); SResultRow* pResult = NULL; int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, @@ -1440,7 +1443,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); SResultRow* pResult = NULL; int32_t forwardStep = 0; @@ -1584,7 +1587,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf // primary timestamp column SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); SOptrBasicInfo* pBInfo = &pInfo->binfo; int64_t gap = pOperator->pRuntimeEnv->pQueryAttr->sw.gap; @@ -1990,30 +1993,30 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT // case OP_MultiTableTimeInterval: { // pRuntimeEnv->proot = // createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); // break; // } // case OP_AllMultiTableTimeInterval: { // pRuntimeEnv->proot = // createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); // break; // } // case OP_TimeWindow: { // pRuntimeEnv->proot = // createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; // if (opType != OP_DummyInput && opType != OP_Join) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); // } // break; // } // case OP_AllTimeWindow: { // pRuntimeEnv->proot = // createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; // if (opType != OP_DummyInput && opType != OP_Join) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); // } // break; // } @@ -2021,34 +2024,34 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT // pRuntimeEnv->proot = // createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); // -// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; // if (opType != OP_DummyInput) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); // } // break; // } // case OP_SessionWindow: { // pRuntimeEnv->proot = // createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; // if (opType != OP_DummyInput) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); // } // break; // } // case OP_MultiTableAggregate: { // pRuntimeEnv->proot = // createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); // break; // } // case OP_Aggregate: { // pRuntimeEnv->proot = // createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); // -// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; // if (opType != OP_DummyInput && opType != OP_Join) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); // } // break; // } @@ -2070,9 +2073,9 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT // // case OP_StateWindow: { // pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; // if (opType != OP_DummyInput) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); // } // break; // } @@ -2884,22 +2887,35 @@ void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFi } } -int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, - uint32_t* status) { - *status = BLK_DATA_NO_NEEDED; - pBlock->pDataBlock = NULL; - pBlock->pBlockAgg = NULL; - - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int64_t groupId = pRuntimeEnv->current->groupIndex; - bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); - - SQInfo* pQInfo = pRuntimeEnv->qinfo; - SQueryCostInfo* pCost = &pQInfo->summary; +int32_t loadDataBlock(STaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { + STaskCostInfo* pCost = &pTaskInfo->cost; pCost->totalBlocks += 1; pCost->totalRows += pBlock->info.rows; + pCost->totalCheckedRows += pBlock->info.rows; + pCost->loadBlocks += 1; + +// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); + if (pBlock->pDataBlock == NULL) { + return terrno; + } +} + +int32_t loadDataBlockOnDemand(STaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { + *status = BLK_DATA_NO_NEEDED; + + pBlock->pDataBlock = NULL; + pBlock->pBlockAgg = NULL; + +// int64_t groupId = pRuntimeEnv->current->groupIndex; +// bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); + + STaskCostInfo* pCost = &pTaskInfo->cost; + + pCost->totalBlocks += 1; + pCost->totalRows += pBlock->info.rows; +#if 0 if (pRuntimeEnv->pTsBuf != NULL) { (*status) = BLK_DATA_ALL_NEEDED; @@ -2924,7 +2940,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab // Calculate all time windows that are overlapping or contain current data block. // If current data block is contained by all possible time window, do not load current data block. if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 || - (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pQueryAttr, &pBlock->info))) { + (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) { (*status) = BLK_DATA_ALL_NEEDED; } @@ -2937,7 +2953,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { SResultRow* pResult = NULL; - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); @@ -2966,7 +2982,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab } SDataBlockInfo* pBlockInfo = &pBlock->info; - *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status); +// *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status); if ((*status) == BLK_DATA_NO_NEEDED || (*status) == BLK_DATA_DISCARD) { //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, @@ -2993,7 +3009,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { SResultRow* pResult = NULL; - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); @@ -3045,7 +3061,7 @@ int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTab // filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery); // } } - +#endif return TSDB_CODE_SUCCESS; } @@ -3264,9 +3280,8 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) } } -static void setupQueryRangeForReverseScan(STaskRuntimeEnv* pRuntimeEnv) { - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - +static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) { +#if 0 int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); for(int32_t i = 0; i < numOfGroups; ++i) { SArray *group = GET_TABLEGROUP(pRuntimeEnv, i); @@ -3285,6 +3300,8 @@ static void setupQueryRangeForReverseScan(STaskRuntimeEnv* pRuntimeEnv) { // assert(pCheckInfo->pTable == pTableKeyInfo->pTable); } } +#endif + } void switchCtxOrder(SQLFunctionCtx* pCtx, int32_t numOfOutput) { @@ -3432,35 +3449,33 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { } } -void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status) { +void setTaskStatus(STaskInfo *pTaskInfo, int8_t status) { if (status == QUERY_NOT_COMPLETED) { - pRuntimeEnv->status = status; + pTaskInfo->status = status; } else { // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first - CLEAR_QUERY_STATUS(pRuntimeEnv, QUERY_NOT_COMPLETED); - pRuntimeEnv->status |= status; + CLEAR_QUERY_STATUS(pTaskInfo, QUERY_NOT_COMPLETED); + pTaskInfo->status |= status; } } -static void setupEnvForReverseScan(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) { - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - - if (pRuntimeEnv->pTsBuf) { - SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order); - bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); - assert(ret); - } +static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) { +// if (pRuntimeEnv->pTsBuf) { +// SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order); +// bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); +// assert(ret); +// } // reverse order time range - SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); + SWAP(pTableScanInfo->window.skey, pTableScanInfo->window.ekey, TSKEY); - SET_REVERSE_SCAN_FLAG(pRuntimeEnv); - setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); + SET_REVERSE_SCAN_FLAG(pTableScanInfo); +// setTaskStatus(pTableScanInfo, QUERY_NOT_COMPLETED); switchCtxOrder(pCtx, numOfOutput); - SWITCH_ORDER(pQueryAttr->order.order); - setupQueryRangeForReverseScan(pRuntimeEnv); + SWITCH_ORDER(pTableScanInfo->order); + setupQueryRangeForReverseScan(pTableScanInfo); } void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { @@ -4041,7 +4056,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data // Check if query is completed or not for stable query or normal table query respectively. if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && pRuntimeEnv->proot->status == OP_EXEC_DONE) { - setQueryStatus(pRuntimeEnv, QUERY_OVER); +// setTaskStatus(pOperator->pTaskInfo, QUERY_OVER); } } @@ -4159,7 +4174,7 @@ void calculateOperatorProfResults(SQInfo* pQInfo) { void queryCostStatis(SQInfo *pQInfo) { STaskRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQueryCostInfo *pSummary = &pQInfo->summary; + STaskCostInfo *pSummary = &pQInfo->summary; uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable); hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map); @@ -4435,11 +4450,11 @@ void queryCostStatis(SQInfo *pQInfo) { void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream) { if (p->pDownstream == NULL) { - assert(p->numOfUpstream == 0); + assert(p->numOfDownstream == 0); } - p->pDownstream = realloc(p->pDownstream, POINTER_BYTES * (p->numOfUpstream + 1)); - p->pDownstream[p->numOfUpstream++] = pUpstream; + p->pDownstream = realloc(p->pDownstream, POINTER_BYTES * (p->numOfDownstream + 1)); + p->pDownstream[p->numOfDownstream++] = pUpstream; } static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); @@ -4587,21 +4602,21 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr return code; } - setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); +// setTaskStatus(pOperator->pTaskInfo, QUERY_NOT_COMPLETED); return TSDB_CODE_SUCCESS; } -static void doTableQueryInfoTimeWindowCheck(STaskAttr* pQueryAttr, STableQueryInfo* pTableQueryInfo) { - if (QUERY_IS_ASC_QUERY(pQueryAttr)) { +static void doTableQueryInfoTimeWindowCheck(STaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) { + if (order == TSDB_ORDER_ASC) { assert( (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) && - (pTableQueryInfo->lastKey >= pTableQueryInfo->win.skey) && - (pTableQueryInfo->win.skey >= pQueryAttr->window.skey && pTableQueryInfo->win.ekey <= pQueryAttr->window.ekey)); + (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) && + (pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey)); } else { assert( (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) && - (pTableQueryInfo->lastKey <= pTableQueryInfo->win.skey) && - (pTableQueryInfo->win.skey <= pQueryAttr->window.skey && pTableQueryInfo->win.ekey >= pQueryAttr->window.ekey)); + (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) && + (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey)); } } @@ -4664,44 +4679,37 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { SOperatorInfo *pOperator = (SOperatorInfo*) param; STableScanInfo *pTableScanInfo = pOperator->info; + STaskInfo *pTaskInfo = pOperator->pTaskInfo; + SSDataBlock *pBlock = &pTableScanInfo->block; - STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - STableGroupInfo *pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo; + STableGroupInfo *pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo; *newgroup = false; -#if 0 - while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { + + while (/*tsdbNextDataBlock(pTableScanInfo->pQueryHandle)*/1) { if (isQueryKilled(pOperator->pRuntimeEnv->qinfo)) { longjmp(pOperator->pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } pTableScanInfo->numOfBlocks += 1; - tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); +// tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); // todo opt - if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) { - STableQueryInfo** pTableQueryInfo = - (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.uid, sizeof(pBlock->info.uid)); - if (pTableQueryInfo == NULL) { - break; - } - - pRuntimeEnv->current = *pTableQueryInfo; - doTableQueryInfoTimeWindowCheck(pQueryAttr, *pTableQueryInfo); - - if (pRuntimeEnv->enableGroupData) { - if(pTableScanInfo->prevGroupId != -1 && pTableScanInfo->prevGroupId != (*pTableQueryInfo)->groupIndex) { - *newgroup = true; - } - } - - pTableScanInfo->prevGroupId = (*pTableQueryInfo)->groupIndex; - } +// if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) { +// STableQueryInfo** pTableQueryInfo = +// (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.uid, sizeof(pBlock->info.uid)); +// if (pTableQueryInfo == NULL) { +// break; +// } +// +// pRuntimeEnv->current = *pTableQueryInfo; +// doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order); +// } // this function never returns error? uint32_t status; - int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); + int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status); +// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { longjmp(pOperator->pRuntimeEnv->env, code); } @@ -4713,7 +4721,6 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { return pBlock; } -#endif return NULL; } @@ -4721,9 +4728,8 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { static SSDataBlock* doTableScan(void* param, bool *newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - STableScanInfo *pTableScanInfo = pOperator->info; - STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + STableScanInfo *pTableScanInfo = pOperator->info; + STaskInfo *pTaskInfo = pOperator->pTaskInfo; SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo; *newgroup = false; @@ -4746,14 +4752,14 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { // STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); // tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); - setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); - pRuntimeEnv->scanFlag = REPEAT_SCAN; - - if (pRuntimeEnv->pTsBuf) { - bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); - assert(ret); - } + setTaskStatus(pTaskInfo, QUERY_NOT_COMPLETED); + pTableScanInfo->scanFlag = REPEAT_SCAN; +// if (pTaskInfo->pTsBuf) { +// bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); +// assert(ret); +// } +// if (pResultRowInfo->size > 0) { pResultRowInfo->curPos = 0; } @@ -4763,17 +4769,15 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { } SSDataBlock *p = NULL; + // todo refactor if (pTableScanInfo->reverseTimes > 0) { - setupEnvForReverseScan(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); - + setupEnvForReverseScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); // STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); // tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); //qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, // GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey); - pRuntimeEnv->scanFlag = REVERSE_SCAN; - pTableScanInfo->times = 1; pTableScanInfo->current = 0; pTableScanInfo->reverseTimes = 0; @@ -4838,25 +4842,25 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { } -SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime) { - assert(repeatTime > 0); +SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime) { + assert(repeatTime > 0 && numOfOutput > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; - pInfo->times = repeatTime; - pInfo->reverseTimes = 0; - pInfo->order = pRuntimeEnv->pQueryAttr->order.order; - pInfo->current = 0; -// pInfo->prevGroupId = -1; + pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->times = repeatTime; + pInfo->reverseTimes = 0; + pInfo->order = order; + pInfo->current = 0; + pInfo->scanFlag = MAIN_SCAN; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableScanOperator"; -// pOperator->operatorType = OP_TableScan; + pOperator->operatorType = OP_TableScan; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = NULL; pOperator->exec = doTableScan; return pOperator; @@ -5080,7 +5084,7 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { tfree(pInfo->prevRow); } -SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, +SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); @@ -5143,7 +5147,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S pOperator->exec = doGlobalAggregate; pOperator->cleanup = destroyGlobalAggOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } @@ -5259,7 +5263,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; } -SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal) { +SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal) { SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); { @@ -5292,7 +5296,7 @@ SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->cleanup = destroyOrderOperatorInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } @@ -5315,12 +5319,12 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; - SOperatorInfo* upstream = pOperator->pDownstream[0]; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { - publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = upstream->exec(upstream, newgroup); - publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5330,8 +5334,8 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); } -// if (upstream->operatorType == OP_DataBlocksOptScan) { -// STableScanInfo* pScanInfo = upstream->info; +// if (downstream->operatorType == OP_DataBlocksOptScan) { +// STableScanInfo* pScanInfo = downstream->info; // order = getTableScanOrder(pScanInfo); // } @@ -5372,12 +5376,12 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; - SOperatorInfo* upstream = pOperator->pDownstream[0]; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { - publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = upstream->exec(upstream, newgroup); - publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5385,8 +5389,8 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); -// if (upstream->operatorType == OP_DataBlocksOptScan) { -// STableScanInfo* pScanInfo = upstream->info; +// if (downstream->operatorType == OP_DataBlocksOptScan) { +// STableScanInfo* pScanInfo = downstream->info; // order = getTableScanOrder(pScanInfo); // } @@ -5463,7 +5467,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { while(1) { bool prevVal = *newgroup; - // The upstream exec may change the value of the newgroup, so use a local variable instead. + // The downstream exec may change the value of the newgroup, so use a local variable instead. publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); @@ -5472,7 +5476,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { assert(*newgroup == false); *newgroup = prevVal; - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); break; } @@ -5616,12 +5620,12 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { int32_t order = pQueryAttr->order.order; STimeWindow win = pQueryAttr->window; - SOperatorInfo* upstream = pOperator->pDownstream[0]; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { - publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = upstream->exec(upstream, newgroup); - publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5640,7 +5644,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pIntervalInfo->resultRowInfo); - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); @@ -5676,12 +5680,12 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { int32_t order = pQueryAttr->order.order; STimeWindow win = pQueryAttr->window; - SOperatorInfo* upstream = pOperator->pDownstream[0]; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { - publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = upstream->exec(upstream, newgroup); - publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5700,7 +5704,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pIntervalInfo->resultRowInfo); - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); @@ -5739,12 +5743,12 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; - SOperatorInfo* upstream = pOperator->pDownstream[0]; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { - publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = upstream->exec(upstream, newgroup); - publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5763,7 +5767,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { pOperator->status = OP_RES_TO_RETURN; pQueryAttr->order.order = order; // TODO : restore the order doCloseAllTimeWindow(pRuntimeEnv); - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { @@ -5794,12 +5798,12 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; - SOperatorInfo* upstream = pOperator->pDownstream[0]; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { - publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = upstream->exec(upstream, newgroup); - publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5818,7 +5822,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { pOperator->status = OP_RES_TO_RETURN; pQueryAttr->order.order = order; // TODO : restore the order doCloseAllTimeWindow(pRuntimeEnv); - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); int64_t st = taosGetTimestampUs(); copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); @@ -5839,7 +5843,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI SOptrBasicInfo* pBInfo = &pInfo->binfo; - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); int16_t bytes = pColInfoData->info.bytes; int16_t type = pColInfoData->info.type; @@ -5930,11 +5934,11 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; STimeWindow win = pQueryAttr->window; - SOperatorInfo* upstream = pOperator->pDownstream[0]; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = upstream->exec(upstream, newgroup); - publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5952,7 +5956,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); @@ -5991,12 +5995,12 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { int32_t order = pQueryAttr->order.order; STimeWindow win = pQueryAttr->window; - SOperatorInfo* upstream = pOperator->pDownstream[0]; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { - publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = upstream->exec(upstream, newgroup); - publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; } @@ -6012,7 +6016,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); +// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); @@ -6044,12 +6048,12 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { return pInfo->binfo.pRes; } - SOperatorInfo* upstream = pOperator->pDownstream[0]; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { - publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = upstream->exec(upstream, newgroup); - publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; } @@ -6066,7 +6070,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); +// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); @@ -6212,19 +6216,19 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { } if (pOperator->pDownstream != NULL) { - for(int32_t i = 0; i < pOperator->numOfUpstream; ++i) { + for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) { destroyOperatorInfo(pOperator->pDownstream[i]); } tfree(pOperator->pDownstream); - pOperator->numOfUpstream = 0; + pOperator->numOfDownstream = 0; } tfree(pOperator->info); tfree(pOperator); } -SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -6236,7 +6240,7 @@ SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperat initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); pInfo->seed = rand(); - setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MASTER_SCAN); + setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MAIN_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; @@ -6250,7 +6254,7 @@ SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperat pOperator->exec = doAggregate; pOperator->cleanup = destroyAggOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } @@ -6324,7 +6328,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = destroyOutputBuf(pInfo->pRes); } -SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); @@ -6345,12 +6349,12 @@ SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOp pOperator->exec = doSTableAggregate; pOperator->cleanup = destroyAggOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } -SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); pInfo->seed = rand(); @@ -6361,7 +6365,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); - setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); + setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MAIN_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ProjectOperator"; @@ -6375,7 +6379,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->exec = doProjectOperation; pOperator->cleanup = destroyProjectOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } @@ -6413,7 +6417,7 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 return 0; } -SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, +SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) { SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo)); @@ -6433,12 +6437,12 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->cleanup = destroyConditionOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } -SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream) { SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit; @@ -6451,12 +6455,12 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->exec = doLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } -SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); @@ -6476,12 +6480,12 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe pOperator->exec = doIntervalAgg; pOperator->cleanup = destroyBasicOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } -SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); @@ -6501,11 +6505,11 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S pOperator->exec = doAllIntervalAgg; pOperator->cleanup = destroyBasicOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } -SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); pInfo->colIndex = -1; pInfo->reptScan = false; @@ -6525,10 +6529,10 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pOperator->exec = doStateWindowAgg; pOperator->cleanup = destroyStateWindowOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } -SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); @@ -6550,11 +6554,11 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->exec = doSessionWindowAgg; pOperator->cleanup = destroySWindowOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } -SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); @@ -6574,11 +6578,11 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim pOperator->exec = doSTableIntervalAgg; pOperator->cleanup = destroyBasicOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } -SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); @@ -6598,13 +6602,13 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun pOperator->exec = doAllSTableIntervalAgg; pOperator->cleanup = destroyBasicOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } -SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); pInfo->colIndex = -1; // group by column index @@ -6631,11 +6635,11 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->exec = hashGroupbyAggregate; pOperator->cleanup = destroyGroupbyOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } -SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { +SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->multigroupResult = multigroupResult; @@ -6670,11 +6674,11 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf pOperator->exec = doFill; pOperator->cleanup = destroySFillOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } -SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { +SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -6718,7 +6722,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->cleanup = destroySlimitOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } @@ -6845,7 +6849,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { } if (pOperator->status == OP_EXEC_DONE) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + setTaskStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); } pRes->info.rows = count; @@ -6992,7 +6996,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } -SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); pInfo->totalBytes = 0; pInfo->buf = NULL; @@ -7016,7 +7020,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato pOperator->pExpr = pExpr; pOperator->cleanup = destroyDistinctOperatorInfo; - appendUpstream(pOperator, upstream); + appendUpstream(pOperator, downstream); return pOperator; } @@ -7170,6 +7174,92 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t return TSDB_CODE_SUCCESS; } +/** + * { + "Id": { + "QueryId": 20, + "TemplateId": 0, + "SubplanId": 0 + }, + "Node": { + "Name": "TableScan", + "InputSchema": [{ + "Type": 9, + "ColId": 1, + "Bytes": 8 + }, { + "Type": 4, + "ColId": 2, + "Bytes": 4 + }, { + "Type": 8, + "ColId": 3, + "Bytes": 20 + }], + "TableScan": { + "TableId": 1, + "TableType": 3, + "Flag": 0, + "Window": { + "StartKey": 0, + "EndKey": 0 + } + } + }, + "DataSink": { + "Name": "Dispatch", + "Dispatch": { + } + } +} + */ +int32_t parseTaskInfo(const char* msg, int32_t len) { + cJSON* pJson = cJSON_Parse(msg); + if (NULL == pJson) { + return TSDB_CODE_INVALID_MSG; + } + + cJSON* pSub = cJSON_GetObjectItem(pJson, "ID"); + if (NULL != pSub) { + printf("Id : %s\n", pSub->valuestring); + } + + cJSON* pNode = cJSON_GetObjectItem(pJson, "Node"); + if (pNode == NULL) { + return TSDB_CODE_INVALID_MSG; + } + + cJSON* pNodeName = cJSON_GetObjectItem(pNode, "name"); + if (pNodeName == NULL) { + return TSDB_CODE_INVALID_MSG; + } + + printf("node name is: %s\n", pNodeName->valuestring); + + cJSON* pNodeSchema = cJSON_GetObjectItem(pNode, "InputSchema"); + if (pNodeSchema == NULL) { + return TSDB_CODE_INVALID_MSG; + } + + cJSON* pOperator = cJSON_GetObjectItem(pNode, pNodeName->valuestring); + if (pOperator == NULL) { + return TSDB_CODE_INVALID_MSG; + } + + cJSON* pTableId = cJSON_GetObjectItem(pOperator, "tableId"); + if (pTableId == NULL) { + return TSDB_CODE_INVALID_MSG; + } + + cJSON* pTimeWindow = cJSON_GetObjectItem(pOperator, "window"); + if (pTimeWindow == NULL) { + return TSDB_CODE_INVALID_MSG; + } + + + +} + /** * pQueryMsg->head has been converted before this function is called. * @@ -8242,7 +8332,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* (!QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.ekey > pQueryAttr->window.skey))) { //qDebug("QInfo:0x%"PRIx64" no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo->qId, pQueryAttr->window.skey, // pQueryAttr->window.ekey, pQueryAttr->order.order); - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); +// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; // todo free memory return TSDB_CODE_SUCCESS; @@ -8250,7 +8340,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) { //qDebug("QInfo:0x%"PRIx64" no table qualified for tag filter, abort query", pQInfo->qId); - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); +// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); return TSDB_CODE_SUCCESS; } @@ -8416,7 +8506,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t // all data returned, set query over if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) { - setQueryStatus(pRuntimeEnv, QUERY_OVER); +// setTaskStatus(pOperator->pTaskInfo, QUERY_OVER); } } else { doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data, compressed, compLen); @@ -8427,7 +8517,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t if (pQueryAttr->limit.limit > 0 && pQueryAttr->limit.limit == pRuntimeEnv->resultInfo.total) { //qDebug("QInfo:0x%"PRIx64" results limitation reached, limitation:%"PRId64, pQInfo->qId, pQueryAttr->limit.limit); - setQueryStatus(pRuntimeEnv, QUERY_OVER); +// setTaskStatus(pOperator->pTaskInfo, QUERY_OVER); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 461f16cdf0..d6bab443ab 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -149,7 +149,7 @@ static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) { static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { // todo - return MASTER_SCAN; + return MAIN_SCAN; } static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {