From 1145d1a0ea1248f9de562400f3b9d8baf7b53bcd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Jan 2022 16:20:13 +0800 Subject: [PATCH] [td-11818] select * --- source/client/src/clientImpl.c | 2 +- source/libs/executor/src/executorMain.c | 20 ++++++++++---------- source/libs/executor/src/executorimpl.c | 7 ++++++- source/libs/planner/src/physicalPlanJson.c | 8 ++++---- source/libs/qworker/src/qworker.c | 2 ++ 5 files changed, 23 insertions(+), 16 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 8fdde9f4e9..10d1d09c85 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -259,7 +259,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr)); - SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 1}; + SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 2}; addr.epAddr[0].port = 6030; strcpy(addr.epAddr[0].fqdn, "localhost"); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 9fad5242ba..56e2977753 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -73,12 +73,12 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ assert(tsdb != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = 0; - uint64_t uid = 0; - STimeWindow window = TSWINDOW_INITIALIZER; - int32_t tableType = 0; + int32_t code = 0; + uint64_t uid = 0; + STimeWindow window = TSWINDOW_INITIALIZER; + int32_t tableType = 0; - SPhyNode *pPhyNode = pSubplan->pNode; + SPhyNode* pPhyNode = pSubplan->pNode; STableGroupInfo groupInfo = {0}; int32_t type = pPhyNode->info.type; @@ -112,10 +112,10 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ } } - code = doCreateExecTaskInfo(pSubplan, pTask, &groupInfo, tsdb); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + code = doCreateExecTaskInfo(pSubplan, pTask, &groupInfo, tsdb); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100}; code = dsDataSinkMgtInit(&cfg); @@ -127,7 +127,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ *handle = (*pTask)->dsHandle; - _error: +_error: // if failed to add ref for all tables in this query, abort current query return code; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1b3c93ab61..4cb233f71e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4952,7 +4952,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { } static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; + SOperatorInfo *pOperator = (SOperatorInfo*) param; SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; @@ -5012,9 +5012,14 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes; memcpy(tmp, pData, len); + + pColInfoData->pData = tmp; pData += len; } + pRes->info.numOfCols = pOperator->numOfOutput; + pRes->info.rows = pExchangeInfo->pRsp->numOfRows; + return pExchangeInfo->pResult; } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 1b6cf89c56..bf38712b13 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -559,10 +559,10 @@ static bool timeWindowFromJson(const cJSON* json, void* obj) { STimeWindow* win = (STimeWindow*)obj; char* pStartKey = getString(json, jkTimeWindowStartKey); - win->skey = strtoll(pStartKey, NULL, 10); + win->skey = strtoul(pStartKey, NULL, 10); char* pEndKey = getString(json, jkTimeWindowEndKey); - win->ekey = strtoll(pEndKey, NULL, 10); + win->ekey = strtoul(pEndKey, NULL, 10); tfree(pStartKey); tfree(pEndKey); @@ -783,7 +783,7 @@ static bool nodeAddrFromJson(const cJSON* json, void* obj) { pSource->taskId = getNumber(json, jkNodeTaskId); char* pSchedId = getString(json, jkNodeTaskSchedId); - pSource->schedId = strtoll(pSchedId, NULL, 10); + pSource->schedId = strtoul(pSchedId, NULL, 10); tfree(pSchedId); bool res = fromObject(json, jkNodeAddr, queryNodeAddrFromJson, &pSource->addr, true); @@ -1032,7 +1032,7 @@ static bool subplanIdFromJson(const cJSON* json, void* obj) { SSubplanId* id = (SSubplanId*)obj; char* queryId = getString(json, jkIdQueryId); - id->queryId = strtoll(queryId, NULL, 0); + id->queryId = strtoul(queryId, NULL, 0); tfree(queryId); id->templateId = getNumber(json, jkIdTemplateId); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a0beaba61d..d01f4f4e52 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -258,6 +258,8 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); + printf("%"PRIx64", tid:%"PRIx64"\n", qId, tId); + SQWTaskCtx nctx = {0}; QW_LOCK(QW_WRITE, &mgmt->ctxLock);