diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 625541e00a..62ce56ee91 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -108,6 +108,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo); +int32_t getGroupIdFromTableTags(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); +size_t getTableTagsBufLen(const SNodeList* pGroups); + SArray* createSortInfo(SNodeList* pNodeList); SArray* extractPartitionColInfo(SNodeList* pNodeList); SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 21068c68a4..3d76a03647 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -423,6 +423,7 @@ typedef struct SStreamScanInfo { // status for tmq // SSchemaWrapper schema; STqOffset offset; + SNodeList* pGroupTags; SNode* pTagCond; SNode* pTagIndexCond; } SStreamScanInfo; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 18bb8a57f4..1a34635fe8 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -379,6 +379,82 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, return code; } +size_t getTableTagsBufLen(const SNodeList* pGroups) { + size_t keyLen = 0; + + SNode* node; + FOREACH(node, pGroups) { + SExprNode* pExpr = (SExprNode*)node; + keyLen += pExpr->resType.bytes; + } + + keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups); + return keyLen; +} + +int32_t getGroupIdFromTableTags(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId) { + SMetaReader mr = {0}; + metaReaderInit(&mr, pMeta, 0); + metaGetTableEntryByUid(&mr, uid); + + SNodeList* groupNew = nodesCloneList(pGroupNode); + + nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr); + char* isNull = (char*)keyBuf; + char* pStart = (char*)keyBuf + sizeof(int8_t)*LIST_LENGTH(pGroupNode); + + SNode* pNode; + int32_t index = 0; + FOREACH(pNode, groupNew) { + SNode* pNew = NULL; + int32_t code = scalarCalculateConstants(pNode, &pNew); + if (TSDB_CODE_SUCCESS == code) { + REPLACE_NODE(pNew); + } else { + taosMemoryFree(keyBuf); + nodesDestroyList(groupNew); + metaReaderClear(&mr); + return code; + } + + ASSERT(nodeType(pNew) == QUERY_NODE_VALUE); + SValueNode* pValue = (SValueNode*)pNew; + + if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) { + isNull[index++] = 1; + continue; + } else { + isNull[index++] = 0; + char* data = nodesGetValueFromNode(pValue); + if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) { + if (tTagIsJson(data)) { + terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; + taosMemoryFree(keyBuf); + nodesDestroyList(groupNew); + metaReaderClear(&mr); + return terrno; + } + int32_t len = getJsonValueLen(data); + memcpy(pStart, data, len); + pStart += len; + } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) { + memcpy(pStart, data, varDataTLen(data)); + pStart += varDataTLen(data); + } else { + memcpy(pStart, data, pValue->node.resType.bytes); + pStart += pValue->node.resType.bytes; + } + } + } + + int32_t len = (int32_t)(pStart - (char*)keyBuf); + *pGroupId = calcGroupId(keyBuf, len); + + nodesDestroyList(groupNew); + metaReaderClear(&mr); + return TSDB_CODE_SUCCESS; +} + SArray* extractPartitionColInfo(SNodeList* pNodeList) { if (!pNodeList) { return NULL; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b00dc9dba5..0d547c2a14 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,10 +14,21 @@ */ #include "executor.h" +#include "tref.h" #include "executorimpl.h" #include "planner.h" #include "tdatablock.h" #include "vnode.h" +#include "tudf.h" + +static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; +int32_t exchangeObjRefPool = -1; + +static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); } +static void cleanupRefPool() { + int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0); + taosCloseRef(ref); +} static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid, char* id) { @@ -184,8 +195,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { return pTaskInfo; } -static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, - const char* idstr) { +static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) { SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); // let's discard the tables those are not created according to the queried super table. @@ -239,7 +249,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo int32_t code = 0; SStreamScanInfo* pScanInfo = pInfo->info; if (isAdd) { // add new table id - SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo)); + SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo)); qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); code = tqReaderAddTbUidList(pScanInfo->tqReader, qa); @@ -247,15 +257,35 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo return code; } - // add to qTaskInfo // todo refactor STableList - for(int32_t i = 0; i < taosArrayGetSize(qa); ++i) { - uint64_t* uid = taosArrayGet(qa, i); + size_t bufLen = (pScanInfo->pGroupTags != NULL)? getTableTagsBufLen(pScanInfo->pGroupTags):0; + char* keyBuf = NULL; + if (bufLen > 0) { + keyBuf = taosMemoryMalloc(bufLen); + if (keyBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + for(int32_t i = 0; i < taosArrayGetSize(qa); ++i) { + uint64_t* uid = taosArrayGet(qa, i); STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; + + if (bufLen > 0) { + code = getGroupIdFromTableTags(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf, + &keyInfo.groupId); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); } + if (keyBuf != NULL) { + taosMemoryFree(keyBuf); + } + taosArrayDestroy(qa); } else { // remove the table id in current list qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList)); @@ -289,3 +319,378 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table return 0; } + +int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, + qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) { + assert(pSubplan != NULL); + SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; + + taosThreadOnce(&initPoolOnce, initRefPool); + atexit(cleanupRefPool); + + int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100}; + code = dsDataSinkMgtInit(&cfg); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (handle) { + void* pSinkParam = NULL; + code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); + } + + _error: + // if failed to add ref for all tables in this query, abort current query + return code; +} + +#ifdef TEST_IMPL +// wait moment +int waitMoment(SQInfo* pQInfo) { + if (pQInfo->sql) { + int ms = 0; + char* pcnt = strstr(pQInfo->sql, " count(*)"); + if (pcnt) return 0; + + char* pos = strstr(pQInfo->sql, " t_"); + if (pos) { + pos += 3; + ms = atoi(pos); + while (*pos >= '0' && *pos <= '9') { + pos++; + } + char unit_char = *pos; + if (unit_char == 'h') { + ms *= 3600 * 1000; + } else if (unit_char == 'm') { + ms *= 60 * 1000; + } else if (unit_char == 's') { + ms *= 1000; + } + } + if (ms == 0) return 0; + printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql); + + if (ms < 1000) { + taosMsleep(ms); + } else { + int used_ms = 0; + while (used_ms < ms) { + taosMsleep(1000); + used_ms += 1000; + if (isTaskKilled(pQInfo)) { + printf("test check query is canceled, sleep break.%s\n", pQInfo->sql); + break; + } + } + } + } + return 1; +} +#endif + +int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + int64_t threadId = taosGetSelfPthreadId(); + + *pRes = NULL; + int64_t curOwner = 0; + if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { + qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); + pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; + return pTaskInfo->code; + } + + if (pTaskInfo->cost.start == 0) { + pTaskInfo->cost.start = taosGetTimestampMs(); + } + + if (isTaskKilled(pTaskInfo)) { + qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); + return TSDB_CODE_SUCCESS; + } + + // error occurs, record the error code and return to client + int32_t ret = setjmp(pTaskInfo->env); + if (ret != TSDB_CODE_SUCCESS) { + pTaskInfo->code = ret; + cleanUpUdfs(); + qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); + atomic_store_64(&pTaskInfo->owner, 0); + + return pTaskInfo->code; + } + + qDebug("%s execTask is launched", GET_TASKID(pTaskInfo)); + + int64_t st = taosGetTimestampUs(); + + *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); + uint64_t el = (taosGetTimestampUs() - st); + + pTaskInfo->cost.elapsedTime += el; + if (NULL == *pRes) { + *useconds = pTaskInfo->cost.elapsedTime; + } + + cleanUpUdfs(); + + int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0; + uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; + + qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", + GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0); + + atomic_store_64(&pTaskInfo->owner, 0); + return pTaskInfo->code; +} + +int32_t qKillTask(qTaskInfo_t qinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; + + if (pTaskInfo == NULL) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + + qDebug("%s execTask killed", GET_TASKID(pTaskInfo)); + setTaskKilled(pTaskInfo); + + // Wait for the query executing thread being stopped/ + // Once the query is stopped, the owner of qHandle will be cleared immediately. + while (pTaskInfo->owner != 0) { + taosMsleep(100); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t qAsyncKillTask(qTaskInfo_t qinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; + + if (pTaskInfo == NULL) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + + qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); + setTaskKilled(pTaskInfo); + + return TSDB_CODE_SUCCESS; +} + +void qDestroyTask(qTaskInfo_t qTaskHandle) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle; + if (pTaskInfo == NULL) { + return; + } + + qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); + + queryCostStatis(pTaskInfo); // print the query cost summary + doDestroyTask(pTaskInfo); +} + +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + int32_t capacity = 0; + + return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum); +} + +int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { + SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo; + if (pTaskInfo->pRoot == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + int32_t nOptrWithVal = 0; + int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); + if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) { + taosMemoryFreeClear(*pOutput); + *len = 0; + } + return code; +} + +int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) { + SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo; + + if (pTaskInfo == NULL || pInput == NULL || len == 0) { + return TSDB_CODE_INVALID_PARA; + } + + return decodeOperator(pTaskInfo->pRoot, pInput, len); +} + +int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + SOperatorInfo* pOperator = pTaskInfo->pRoot; + + while (1) { + uint8_t type = pOperator->operatorType; + if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + *scanner = pOperator->info; + return 0; + } else { + ASSERT(pOperator->numOfDownstream == 1); + pOperator = pOperator->pDownstream[0]; + } + } +} + +#if 0 +int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem); + return 0; +} +#endif + +int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + pTaskInfo->streamInfo.recoverStartVer = startVer; + pTaskInfo->streamInfo.recoverEndVer = endVer; + pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE; + return 0; +} + +void* qExtractReaderFromStreamScanner(void* scanner) { + SStreamScanInfo* pInfo = scanner; + return (void*)pInfo->tqReader; +} + +const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) { + SStreamScanInfo* pInfo = scanner; + return pInfo->tqReader->pSchemaWrapper; +} + +const STqOffset* qExtractStatusFromStreamScanner(void* scanner) { + SStreamScanInfo* pInfo = scanner; + return &pInfo->offset; +} + +void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); + return pTaskInfo->streamInfo.metaBlk; +} + +int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); + memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal)); + return 0; +} + +int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + SOperatorInfo* pOperator = pTaskInfo->pRoot; + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); + pTaskInfo->streamInfo.prepareStatus = *pOffset; + if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { + while (1) { + uint8_t type = pOperator->operatorType; + pOperator->status = OP_OPENED; + if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pInfo = pOperator->info; + if (pOffset->type == TMQ_OFFSET__LOG) { +#if 0 + if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && + pInfo->tqReader->pWalReader->curVersion != pOffset->version) { + qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version, + pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version); + ASSERT(0); + } +#endif + if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { + return -1; + } + ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1); + } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ + int64_t uid = pOffset->uid; + int64_t ts = pOffset->ts; + + if (uid == 0) { + if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); + uid = pTableInfo->uid; + ts = INT64_MIN; + } else { + return -1; + } + } + /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ + /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + bool found = false; + for (int32_t i = 0; i < tableSz; i++) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); + if (pTableInfo->uid == uid) { + found = true; + pTableScanInfo->currentTable = i; + break; + } + } + + // TODO after dropping table, table may be not found + ASSERT(found); + + tsdbSetTableId(pTableScanInfo->dataReader, uid); + int64_t oldSkey = pTableScanInfo->cond.twindows.skey; + pTableScanInfo->cond.twindows.skey = ts + 1; + tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); + pTableScanInfo->cond.twindows.skey = oldSkey; + pTableScanInfo->scanTimes = 0; + + qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts, + pTableScanInfo->currentTable, tableSz); + /*}*/ + + } else { + ASSERT(0); + } + return 0; + } else { + ASSERT(pOperator->numOfDownstream == 1); + pOperator = pOperator->pDownstream[0]; + } + } + } + return 0; +} + +#if 0 +int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + + if (uid == 0) { + if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); + uid = pTableInfo->uid; + ts = INT64_MIN; + } + } + + return doPrepareScan(pTaskInfo->pRoot, uid, ts); +} + +int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + + return doGetScanStatus(pTaskInfo->pRoot, uid, ts); +} +#endif + diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c deleted file mode 100644 index e0020a496e..0000000000 --- a/source/libs/executor/src/executorMain.c +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "dataSinkMgt.h" -#include "os.h" -#include "tmsg.h" -#include "tref.h" -#include "tudf.h" - -#include "executor.h" -#include "executorimpl.h" -#include "query.h" - -static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; -int32_t exchangeObjRefPool = -1; - -static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); } -static void cleanupRefPool() { - int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0); - taosCloseRef(ref); -} - -int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, - qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) { - assert(pSubplan != NULL); - SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - - taosThreadOnce(&initPoolOnce, initRefPool); - atexit(cleanupRefPool); - - int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100}; - code = dsDataSinkMgtInit(&cfg); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - if (handle) { - void* pSinkParam = NULL; - code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); - } - -_error: - // if failed to add ref for all tables in this query, abort current query - return code; -} - -#ifdef TEST_IMPL -// wait moment -int waitMoment(SQInfo* pQInfo) { - if (pQInfo->sql) { - int ms = 0; - char* pcnt = strstr(pQInfo->sql, " count(*)"); - if (pcnt) return 0; - - char* pos = strstr(pQInfo->sql, " t_"); - if (pos) { - pos += 3; - ms = atoi(pos); - while (*pos >= '0' && *pos <= '9') { - pos++; - } - char unit_char = *pos; - if (unit_char == 'h') { - ms *= 3600 * 1000; - } else if (unit_char == 'm') { - ms *= 60 * 1000; - } else if (unit_char == 's') { - ms *= 1000; - } - } - if (ms == 0) return 0; - printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql); - - if (ms < 1000) { - taosMsleep(ms); - } else { - int used_ms = 0; - while (used_ms < ms) { - taosMsleep(1000); - used_ms += 1000; - if (isTaskKilled(pQInfo)) { - printf("test check query is canceled, sleep break.%s\n", pQInfo->sql); - break; - } - } - } - } - return 1; -} -#endif - -int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int64_t threadId = taosGetSelfPthreadId(); - - *pRes = NULL; - int64_t curOwner = 0; - if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { - qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); - pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; - return pTaskInfo->code; - } - - if (pTaskInfo->cost.start == 0) { - pTaskInfo->cost.start = taosGetTimestampMs(); - } - - if (isTaskKilled(pTaskInfo)) { - qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); - return TSDB_CODE_SUCCESS; - } - - // error occurs, record the error code and return to client - int32_t ret = setjmp(pTaskInfo->env); - if (ret != TSDB_CODE_SUCCESS) { - pTaskInfo->code = ret; - cleanUpUdfs(); - qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); - return pTaskInfo->code; - } - - qDebug("%s execTask is launched", GET_TASKID(pTaskInfo)); - - int64_t st = taosGetTimestampUs(); - - *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); - uint64_t el = (taosGetTimestampUs() - st); - - pTaskInfo->cost.elapsedTime += el; - if (NULL == *pRes) { - *useconds = pTaskInfo->cost.elapsedTime; - } - - cleanUpUdfs(); - - int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0; - uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; - - qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", - GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0); - - atomic_store_64(&pTaskInfo->owner, 0); - return pTaskInfo->code; -} - -int32_t qKillTask(qTaskInfo_t qinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; - - if (pTaskInfo == NULL) { - return TSDB_CODE_QRY_INVALID_QHANDLE; - } - - qDebug("%s execTask killed", GET_TASKID(pTaskInfo)); - setTaskKilled(pTaskInfo); - - // Wait for the query executing thread being stopped/ - // Once the query is stopped, the owner of qHandle will be cleared immediately. - while (pTaskInfo->owner != 0) { - taosMsleep(100); - } - - return TSDB_CODE_SUCCESS; -} - -int32_t qAsyncKillTask(qTaskInfo_t qinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; - - if (pTaskInfo == NULL) { - return TSDB_CODE_QRY_INVALID_QHANDLE; - } - - qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); - setTaskKilled(pTaskInfo); - - return TSDB_CODE_SUCCESS; -} - -void qDestroyTask(qTaskInfo_t qTaskHandle) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle; - if (pTaskInfo == NULL) { - return; - } - - qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); - - queryCostStatis(pTaskInfo); // print the query cost summary - doDestroyTask(pTaskInfo); -} - -int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t capacity = 0; - - return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum); -} - -int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { - SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo; - if (pTaskInfo->pRoot == NULL) { - return TSDB_CODE_INVALID_PARA; - } - - int32_t nOptrWithVal = 0; - int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); - if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) { - taosMemoryFreeClear(*pOutput); - *len = 0; - } - return code; -} - -int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) { - SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo; - - if (pTaskInfo == NULL || pInput == NULL || len == 0) { - return TSDB_CODE_INVALID_PARA; - } - - return decodeOperator(pTaskInfo->pRoot, pInput, len); -} - -int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - SOperatorInfo* pOperator = pTaskInfo->pRoot; - - while (1) { - uint8_t type = pOperator->operatorType; - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - *scanner = pOperator->info; - return 0; - } else { - ASSERT(pOperator->numOfDownstream == 1); - pOperator = pOperator->pDownstream[0]; - } - } -} - -#if 0 -int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); - taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem); - return 0; -} -#endif - -int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); - pTaskInfo->streamInfo.recoverStartVer = startVer; - pTaskInfo->streamInfo.recoverEndVer = endVer; - pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE; - return 0; -} - -void* qExtractReaderFromStreamScanner(void* scanner) { - SStreamScanInfo* pInfo = scanner; - return (void*)pInfo->tqReader; -} - -const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) { - SStreamScanInfo* pInfo = scanner; - return pInfo->tqReader->pSchemaWrapper; -} - -const STqOffset* qExtractStatusFromStreamScanner(void* scanner) { - SStreamScanInfo* pInfo = scanner; - return &pInfo->offset; -} - -void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); - return pTaskInfo->streamInfo.metaBlk; -} - -int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); - memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal)); - return 0; -} - -int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - SOperatorInfo* pOperator = pTaskInfo->pRoot; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); - pTaskInfo->streamInfo.prepareStatus = *pOffset; - if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { - while (1) { - uint8_t type = pOperator->operatorType; - pOperator->status = OP_OPENED; - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamScanInfo* pInfo = pOperator->info; - if (pOffset->type == TMQ_OFFSET__LOG) { -#if 0 - if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && - pInfo->tqReader->pWalReader->curVersion != pOffset->version) { - qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version, - pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version); - ASSERT(0); - } -#endif - if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { - return -1; - } - ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1); - } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ - int64_t uid = pOffset->uid; - int64_t ts = pOffset->ts; - - if (uid == 0) { - if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); - uid = pTableInfo->uid; - ts = INT64_MIN; - } else { - return -1; - } - } - /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ - /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); - bool found = false; - for (int32_t i = 0; i < tableSz; i++) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); - if (pTableInfo->uid == uid) { - found = true; - pTableScanInfo->currentTable = i; - break; - } - } - - // TODO after dropping table, table may be not found - ASSERT(found); - - tsdbSetTableId(pTableScanInfo->dataReader, uid); - int64_t oldSkey = pTableScanInfo->cond.twindows.skey; - pTableScanInfo->cond.twindows.skey = ts + 1; - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); - pTableScanInfo->cond.twindows.skey = oldSkey; - pTableScanInfo->scanTimes = 0; - - qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts, - pTableScanInfo->currentTable, tableSz); - /*}*/ - - } else { - ASSERT(0); - } - return 0; - } else { - ASSERT(pOperator->numOfDownstream == 1); - pOperator = pOperator->pDownstream[0]; - } - } - } - return 0; -} - -#if 0 -int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - - if (uid == 0) { - if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); - uid = pTableInfo->uid; - ts = INT64_MIN; - } - } - - return doPrepareScan(pTaskInfo->pRoot, uid, ts); -} - -int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - - return doGetScanStatus(pTaskInfo->pRoot, uid, ts); -} -#endif diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a930a7bb46..d3bfac82b6 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3187,7 +3187,8 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { return TDB_CODE_SUCCESS; } -int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) { +int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SSDataBlock** pExistedBlock, + bool holdDataInBuf) { if (pLimitInfo->remainGroupOffset > 0) { if (pLimitInfo->currentGroupId == 0) { // it is the first group pLimitInfo->currentGroupId = pBlock->info.groupId; @@ -3208,6 +3209,7 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa pLimitInfo->currentGroupId = pBlock->info.groupId; } + // here check for a new group data, we need to handle the data of the previous group. if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) { pLimitInfo->numOfOutputGroups += 1; if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { @@ -3220,6 +3222,13 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa // reset the value for a new group data pLimitInfo->numOfOutputRows = 0; pLimitInfo->remainOffset = pLimitInfo->limit.offset; + + *pExistedBlock = pBlock; + + // existing rows that belongs to previous group. + if (pBlock->info.rows > 0) { + return PROJECT_RETRIEVE_DONE; + } } // here we reach the start position, according to the limit/offset requirements. @@ -3305,18 +3314,12 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { while (1) { // The downstream exec may change the value of the newgroup, so use a local variable instead. - qDebug("projection call next"); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - qDebug("projection get null"); - - /*if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH) {*/ doSetOperatorCompleted(pOperator); - /*} else if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {*/ - /*pOperator->status = OP_RES_TO_RETURN;*/ - /*}*/ break; } + if (pBlock->info.type == STREAM_RETRIEVE) { // for stream interval return pBlock; @@ -4133,9 +4136,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT return pTaskInfo; } -static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableListInfo, const char* idstr); - static SArray* extractColumnInfo(SNodeList* pNodeList); SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); @@ -4292,69 +4292,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, int32_t groupNum = 0; for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - SMetaReader mr = {0}; - metaReaderInit(&mr, pHandle->meta, 0); - metaGetTableEntryByUid(&mr, info->uid); - - SNodeList* groupNew = nodesCloneList(group); - - nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr); - char* isNull = (char*)keyBuf; - char* pStart = (char*)keyBuf + nullFlagSize; - - SNode* pNode; - int32_t index = 0; - FOREACH(pNode, groupNew) { - SNode* pNew = NULL; - int32_t code = scalarCalculateConstants(pNode, &pNew); - if (TSDB_CODE_SUCCESS == code) { - REPLACE_NODE(pNew); - } else { - taosMemoryFree(keyBuf); - nodesDestroyList(groupNew); - metaReaderClear(&mr); - return code; - } - - ASSERT(nodeType(pNew) == QUERY_NODE_VALUE); - SValueNode* pValue = (SValueNode*)pNew; - - if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) { - isNull[index++] = 1; - continue; - } else { - isNull[index++] = 0; - char* data = nodesGetValueFromNode(pValue); - if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) { - if (tTagIsJson(data)) { - terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; - taosMemoryFree(keyBuf); - nodesDestroyList(groupNew); - metaReaderClear(&mr); - return terrno; - } - int32_t len = getJsonValueLen(data); - memcpy(pStart, data, len); - pStart += len; - } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) { - memcpy(pStart, data, varDataTLen(data)); - pStart += varDataTLen(data); - } else { - memcpy(pStart, data, pValue->node.resType.bytes); - pStart += pValue->node.resType.bytes; - } - } + int32_t code = getGroupIdFromTableTags(pHandle->meta, info->uid, group, keyBuf, &info->groupId); + if (code != TSDB_CODE_SUCCESS) { + return code; } - int32_t len = (int32_t)(pStart - (char*)keyBuf); - uint64_t groupId = calcGroupId(keyBuf, len); - taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t)); - info->groupId = groupId; + taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); groupNum++; - - nodesDestroyList(groupNew); - metaReaderClear(&mr); } + taosMemoryFree(keyBuf); if (pTableListInfo->needSortTableByGroupId) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ab62905c3f..2c206fbc12 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1538,7 +1538,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; pInfo->pTagCond = pTagCond; - + pInfo->pGroupTags = pTableScanNode->pGroupTags; pInfo->twAggSup = *pTwSup; int32_t numOfCols = 0;