From f8bde073403169ee928a11bc5c104806e86b9e63 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 4 Mar 2022 02:17:04 -0500 Subject: [PATCH] TD-13037 datasink code --- include/libs/nodes/plannodes.h | 4 +- source/libs/executor/src/dataDispatcher.c | 16 +- source/libs/nodes/src/nodesCloneFuncs.c | 23 ++- source/libs/nodes/src/nodesCodeFuncs.c | 177 +++++++++++++--------- source/libs/nodes/src/nodesUtilFuncs.c | 1 + source/libs/planner/src/physicalPlan.c | 48 +++--- source/libs/planner/test/plannerTest.cpp | 13 +- source/util/src/tjson.c | 13 +- 8 files changed, 180 insertions(+), 115 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index d72befcaf0..f07183d64f 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -113,7 +113,7 @@ typedef struct SDataBlockDescNode { typedef struct SPhysiNode { ENodeType type; - SDataBlockDescNode outputDataBlockDesc; + SDataBlockDescNode* pOutputDataBlockDesc; SNode* pConditions; SNodeList* pChildren; struct SPhysiNode* pParent; @@ -175,7 +175,7 @@ typedef struct SExchangePhysiNode { typedef struct SDataSinkNode { ENodeType type;; - SDataBlockDescNode inputDataBlockDesc; + SDataBlockDescNode* pInputDataBlockDesc; } SDataSinkNode; typedef struct SDataDispatcherNode { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 4f1b83ad5d..9c86e19166 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -39,7 +39,7 @@ typedef struct SDataCacheEntry { typedef struct SDataDispatchHandle { SDataSinkHandle sink; SDataSinkManager* pManager; - SDataBlockDescNode schema; + SDataBlockDescNode* pSchema; STaosQueue* pDataBlocks; SDataDispatchBuf nextOutput; int32_t status; @@ -109,14 +109,14 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema // data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; - pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema)); + pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema); pEntry->numOfRows = pInput->pData->info.rows; pEntry->dataLen = 0; pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap); - copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen); + copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen); if (0 == pEntry->compressed) { - pEntry->dataLen = pHandle->schema.resultRowSize * pInput->pData->info.rows; + pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows; } pBuf->useSize += pEntry->dataLen; // todo completed @@ -130,7 +130,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, return false; } - pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows; + pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows; pBuf->pData = malloc(pBuf->allocSize); if (pBuf->pData == NULL) { qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); @@ -196,7 +196,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { if (NULL == pDispatcher->nextOutput.pData) { assert(pDispatcher->queryEnd); pOutput->useconds = pDispatcher->useconds; - pOutput->precision = pDispatcher->schema.precision; + pOutput->precision = pDispatcher->pSchema->precision; return TSDB_CODE_SUCCESS; } SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); @@ -208,7 +208,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { pthread_mutex_lock(&pDispatcher->mutex); pOutput->queryEnd = pDispatcher->queryEnd; pOutput->useconds = pDispatcher->useconds; - pOutput->precision = pDispatcher->schema.precision; + pOutput->precision = pDispatcher->pSchema->precision; pthread_mutex_unlock(&pDispatcher->mutex); return TSDB_CODE_SUCCESS; } @@ -238,7 +238,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD dispatcher->sink.fGetData = getDataBlock; dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->pManager = pManager; - dispatcher->schema = pDataSink->inputDataBlockDesc; + dispatcher->pSchema = pDataSink->pInputDataBlockDesc; dispatcher->status = DS_BUF_EMPTY; dispatcher->queryEnd = false; dispatcher->pDataBlocks = taosOpenQueue(); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index befe96050a..403b177a97 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -165,6 +165,23 @@ static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) { return (SNode*)pDst; } +static SNode* dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) { + COPY_SCALAR_FIELD(dataBlockId); + COPY_NODE_LIST_FIELD(pSlots); + COPY_SCALAR_FIELD(resultRowSize); + COPY_SCALAR_FIELD(precision); + return (SNode*)pDst; +} + +static SNode* slotDescCopy(const SSlotDescNode* pSrc, SSlotDescNode* pDst) { + COPY_SCALAR_FIELD(slotId); + dataTypeCopy(&pSrc->dataType, &pDst->dataType); + COPY_SCALAR_FIELD(reserve); + COPY_SCALAR_FIELD(output); + COPY_SCALAR_FIELD(tag); + return (SNode*)pDst; +} + SNodeptr nodesCloneNode(const SNodeptr pNode) { if (NULL == pNode) { return NULL; @@ -196,8 +213,12 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { case QUERY_NODE_ORDER_BY_EXPR: case QUERY_NODE_LIMIT: break; + case QUERY_NODE_DATABLOCK_DESC: + return dataBlockDescCopy((const SDataBlockDescNode*)pNode, (SDataBlockDescNode*)pDst); + case QUERY_NODE_SLOT_DESC: + return slotDescCopy((const SSlotDescNode*)pNode, (SSlotDescNode*)pDst); case QUERY_NODE_LOGIC_SUBPLAN: - return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst); + return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst); default: break; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 284798a3ba..1f99a00bba 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -262,7 +262,7 @@ static const char* jkPhysiPlanChildren = "Children"; static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) { const SPhysiNode* pNode = (const SPhysiNode*)pObj; - int32_t code = tjsonAddObject(pJson, jkPhysiPlanOutputDataBlockDesc, nodeToJson, &pNode->outputDataBlockDesc); + int32_t code = tjsonAddObject(pJson, jkPhysiPlanOutputDataBlockDesc, nodeToJson, pNode->pOutputDataBlockDesc); if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkPhysiPlanConditions, nodeToJson, pNode->pConditions); } @@ -276,7 +276,7 @@ static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) { static int32_t jsonToPhysicPlanNode(const SJson* pJson, void* pObj) { SPhysiNode* pNode = (SPhysiNode*)pObj; - int32_t code = tjsonToObject(pJson, jkPhysiPlanOutputDataBlockDesc, jsonToNode, &pNode->outputDataBlockDesc); + int32_t code = jsonToNodeObject(pJson, jkPhysiPlanOutputDataBlockDesc, (SNode**)&pNode->pOutputDataBlockDesc); if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkPhysiPlanConditions, &pNode->pConditions); } @@ -494,6 +494,26 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc"; + +static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) { + const SDataSinkNode* pNode = (const SDataSinkNode*)pObj; + return tjsonAddObject(pJson, jkDataSinkInputDataBlockDesc, nodeToJson, pNode->pInputDataBlockDesc); +} + +static int32_t jsonToPhysicDataSinkNode(const SJson* pJson, void* pObj) { + SDataSinkNode* pNode = (SDataSinkNode*)pObj; + return jsonToNodeObject(pJson, jkDataSinkInputDataBlockDesc, (SNode**)&pNode->pInputDataBlockDesc); +} + +static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) { + return physicDataSinkNodeToJson(pObj, pJson); +} + +static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) { + return jsonToPhysicDataSinkNode(pJson, pObj); +} + static const char* jkSubplanIdQueryId = "QueryId"; static const char* jkSubplanIdTemplateId = "TemplateId"; static const char* jkSubplanIdSubplanId = "SubplanId"; @@ -861,41 +881,43 @@ static int32_t valueNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkValueDuration, pNode->isDuration); } - switch (pNode->node.resType.type) { - case TSDB_DATA_TYPE_NULL: - break; - case TSDB_DATA_TYPE_BOOL: - code = tjsonAddIntegerToObject(pJson, jkValueDuration, pNode->datum.b); - break; - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_TIMESTAMP: - code = tjsonAddIntegerToObject(pJson, jkValueDuration, pNode->datum.i); - break; - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_UBIGINT: - code = tjsonAddIntegerToObject(pJson, jkValueDuration, pNode->datum.u); - break; - case TSDB_DATA_TYPE_FLOAT: - case TSDB_DATA_TYPE_DOUBLE: - code = tjsonAddDoubleToObject(pJson, jkValueDuration, pNode->datum.d); - break; - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_VARCHAR: - case TSDB_DATA_TYPE_VARBINARY: - code = tjsonAddStringToObject(pJson, jkValueLiteral, pNode->datum.p); - break; - case TSDB_DATA_TYPE_JSON: - case TSDB_DATA_TYPE_DECIMAL: - case TSDB_DATA_TYPE_BLOB: - // todo - default: - break; + if (TSDB_CODE_SUCCESS == code) { + switch (pNode->node.resType.type) { + case TSDB_DATA_TYPE_NULL: + break; + case TSDB_DATA_TYPE_BOOL: + code = tjsonAddIntegerToObject(pJson, jkValueDatum, pNode->datum.b); + break; + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: + code = tjsonAddIntegerToObject(pJson, jkValueDatum, pNode->datum.i); + break; + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_UBIGINT: + code = tjsonAddIntegerToObject(pJson, jkValueDatum, pNode->datum.u); + break; + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: + code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d); + break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_VARBINARY: + code = tjsonAddStringToObject(pJson, jkValueDatum, pNode->datum.p); + break; + case TSDB_DATA_TYPE_JSON: + case TSDB_DATA_TYPE_DECIMAL: + case TSDB_DATA_TYPE_BLOB: + // todo + default: + break; + } } return code; @@ -911,41 +933,43 @@ static int32_t jsonToValueNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkValueDuration, &pNode->isDuration); } - switch (pNode->node.resType.type) { - case TSDB_DATA_TYPE_NULL: - break; - case TSDB_DATA_TYPE_BOOL: - code = tjsonGetBoolValue(pJson, jkValueDuration, &pNode->datum.b); - break; - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_TIMESTAMP: - code = tjsonGetBigIntValue(pJson, jkValueDuration, &pNode->datum.i); - break; - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_UBIGINT: - code = tjsonGetUBigIntValue(pJson, jkValueDuration, &pNode->datum.u); - break; - case TSDB_DATA_TYPE_FLOAT: - case TSDB_DATA_TYPE_DOUBLE: - code = tjsonGetDoubleValue(pJson, jkValueDuration, &pNode->datum.d); - break; - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_VARCHAR: - case TSDB_DATA_TYPE_VARBINARY: - code = tjsonDupStringValue(pJson, jkValueLiteral, &pNode->datum.p); - break; - case TSDB_DATA_TYPE_JSON: - case TSDB_DATA_TYPE_DECIMAL: - case TSDB_DATA_TYPE_BLOB: - // todo - default: - break; + if (TSDB_CODE_SUCCESS == code) { + switch (pNode->node.resType.type) { + case TSDB_DATA_TYPE_NULL: + break; + case TSDB_DATA_TYPE_BOOL: + code = tjsonGetBoolValue(pJson, jkValueDatum, &pNode->datum.b); + break; + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: + code = tjsonGetBigIntValue(pJson, jkValueDatum, &pNode->datum.i); + break; + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_UBIGINT: + code = tjsonGetUBigIntValue(pJson, jkValueDatum, &pNode->datum.u); + break; + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: + code = tjsonGetDoubleValue(pJson, jkValueDatum, &pNode->datum.d); + break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_VARBINARY: + code = tjsonDupStringValue(pJson, jkValueDatum, &pNode->datum.p); + break; + case TSDB_DATA_TYPE_JSON: + case TSDB_DATA_TYPE_DECIMAL: + case TSDB_DATA_TYPE_BLOB: + // todo + default: + break; + } } return code; @@ -1328,6 +1352,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: case QUERY_NODE_PHYSICAL_PLAN_SORT: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: + return physiDispatchNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_INSERT: break; case QUERY_NODE_PHYSICAL_SUBPLAN: @@ -1397,6 +1422,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiJoinNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_AGG: return jsonToPhysiAggNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: + return jsonToPhysiDispatchNode(pJson, pObj); case QUERY_NODE_PHYSICAL_SUBPLAN: return jsonToSubplan(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN: @@ -1404,6 +1431,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { default: break; } + printf("================================ jsonToSpecificNode unknown node = %s\n", nodesNodeName(nodeType(pObj))); return TSDB_CODE_SUCCESS; } @@ -1432,6 +1460,9 @@ static int32_t jsonToNode(const SJson* pJson, void* pObj) { pNode->type = val; if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, nodesNodeName(pNode->type), jsonToSpecificNode, pNode); + if (TSDB_CODE_SUCCESS != code) { + printf("%s toNode error\n", nodesNodeName(pNode->type)); + } } return code; @@ -1454,7 +1485,7 @@ static int32_t makeNodeByJson(const SJson* pJson, SNode** pNode) { static int32_t jsonToNodeObject(const SJson* pJson, const char* pName, SNode** pNode) { SJson* pJsonNode = tjsonGetObjectItem(pJson, pName); if (NULL == pJsonNode) { - return TSDB_CODE_FAILED; + return TSDB_CODE_SUCCESS; } return makeNodeByJson(pJsonNode, pNode); } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 498668d231..49f2afc823 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -131,6 +131,7 @@ SNodeptr nodesMakeNode(ENodeType type) { default: break; } + printf("================================ nodesMakeNode unknown node = %s\n", nodesNodeName(type)); return NULL; } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 66ad0a9879..9df2b9b708 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -153,17 +153,20 @@ static SNodeList* setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, ENodeType type) { SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type); - if (NULL == pPhysiNode) { + CHECK_ALLOC(pPhysiNode, NULL); + pPhysiNode->pOutputDataBlockDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC); + if (NULL == pPhysiNode->pOutputDataBlockDesc) { + nodesDestroyNode(pPhysiNode); return NULL; } - pPhysiNode->outputDataBlockDesc.dataBlockId = pCxt->nextDataBlockId++; - pPhysiNode->outputDataBlockDesc.type = QUERY_NODE_DATABLOCK_DESC; + pPhysiNode->pOutputDataBlockDesc->dataBlockId = pCxt->nextDataBlockId++; + pPhysiNode->pOutputDataBlockDesc->type = QUERY_NODE_DATABLOCK_DESC; return pPhysiNode; } static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) { if (NULL != pLogicNode->pConditions) { - pPhysiNode->pConditions = setNodeSlotId(pCxt, pPhysiNode->outputDataBlockDesc.dataBlockId, -1, pLogicNode->pConditions); + pPhysiNode->pConditions = setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions); CHECK_ALLOC(pPhysiNode->pConditions, TSDB_CODE_OUT_OF_MEMORY); } return TSDB_CODE_SUCCESS; @@ -188,11 +191,11 @@ static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanL CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); } // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t - CHECK_CODE(addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, &pScanPhysiNode->node.outputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY); + CHECK_CODE(addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY); CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode), TSDB_CODE_OUT_OF_MEMORY); - CHECK_CODE(setSlotOutput(pCxt, pScanLogicNode->node.pTargets, &pScanPhysiNode->node.outputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY); + CHECK_CODE(setSlotOutput(pCxt, pScanLogicNode->node.pTargets, pScanPhysiNode->node.pOutputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY); pScanPhysiNode->uid = pScanLogicNode->pMeta->uid; pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType; @@ -276,18 +279,18 @@ static SPhysiNode* createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_JOIN); CHECK_ALLOC(pJoin, NULL); - SDataBlockDescNode* pLeftDesc = &((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc; - SDataBlockDescNode* pRightDesc = &((SPhysiNode*)nodesListGetNode(pChildren, 1))->outputDataBlockDesc; + SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; + SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc; pJoin->pOnConditions = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions); CHECK_ALLOC(pJoin->pOnConditions, (SPhysiNode*)pJoin); pJoin->pTargets = createJoinOutputCols(pCxt, pLeftDesc, pRightDesc); CHECK_ALLOC(pJoin->pTargets, (SPhysiNode*)pJoin); - CHECK_CODE(addDataBlockDesc(pCxt, pJoin->pTargets, &pJoin->node.outputDataBlockDesc), (SPhysiNode*)pJoin); + CHECK_CODE(addDataBlockDesc(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc), (SPhysiNode*)pJoin); CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin), (SPhysiNode*)pJoin); - CHECK_CODE(setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, &pJoin->node.outputDataBlockDesc), (SPhysiNode*)pJoin); + CHECK_CODE(setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, pJoin->node.pOutputDataBlockDesc), (SPhysiNode*)pJoin); return (SPhysiNode*)pJoin; } @@ -385,8 +388,8 @@ static SPhysiNode* createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys), (SPhysiNode*)pAgg); CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs), (SPhysiNode*)pAgg); - SDataBlockDescNode* pChildTupe = &(((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc); - // push down expression to outputDataBlockDesc of child node + SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); + // push down expression to pOutputDataBlockDesc of child node if (NULL != pPrecalcExprs) { pAgg->pExprs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs); CHECK_ALLOC(pAgg->pExprs, (SPhysiNode*)pAgg); @@ -396,18 +399,18 @@ static SPhysiNode* createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild if (NULL != pGroupKeys) { pAgg->pGroupKeys = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys); CHECK_ALLOC(pAgg->pGroupKeys, (SPhysiNode*)pAgg); - CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pGroupKeys, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg); + CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc), (SPhysiNode*)pAgg); } if (NULL != pAggFuncs) { pAgg->pAggFuncs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs); CHECK_ALLOC(pAgg->pAggFuncs, (SPhysiNode*)pAgg); - CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pAggFuncs, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg); + CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc), (SPhysiNode*)pAgg); } CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg), (SPhysiNode*)pAgg); - CHECK_CODE(setSlotOutput(pCxt, pAggLogicNode->node.pTargets, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg); + CHECK_CODE(setSlotOutput(pCxt, pAggLogicNode->node.pTargets, pAgg->node.pOutputDataBlockDesc), (SPhysiNode*)pAgg); return (SPhysiNode*)pAgg; } @@ -416,9 +419,9 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_PROJECT); CHECK_ALLOC(pProject, NULL); - pProject->pProjections = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc.dataBlockId, -1, pProjectLogicNode->pProjections); + pProject->pProjections = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1, pProjectLogicNode->pProjections); CHECK_ALLOC(pProject->pProjections, (SPhysiNode*)pProject); - CHECK_CODE(addDataBlockDesc(pCxt, pProject->pProjections, &pProject->node.outputDataBlockDesc), (SPhysiNode*)pProject); + CHECK_CODE(addDataBlockDesc(pCxt, pProject->pProjections, pProject->node.pOutputDataBlockDesc), (SPhysiNode*)pProject); CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject), (SPhysiNode*)pProject); @@ -468,12 +471,21 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl static SDataSinkNode* createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks) { SDataInserterNode* pInserter = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT); + CHECK_ALLOC(pInserter, NULL); pInserter->numOfTables = pBlocks->numOfTables; pInserter->size = pBlocks->size; TSWAP(pInserter->pData, pBlocks->pData, char*); return (SDataSinkNode*)pInserter; } +static SDataSinkNode* createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot) { + SDataDispatcherNode* pDispatcher = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH); + CHECK_ALLOC(pDispatcher, NULL); + pDispatcher->sink.pInputDataBlockDesc = nodesCloneNode(pRoot->pOutputDataBlockDesc); + CHECK_ALLOC(pDispatcher->sink.pInputDataBlockDesc, (SDataSinkNode*)pDispatcher); + return (SDataSinkNode*)pDispatcher; +} + static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan) { SSubplan* pSubplan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); CHECK_ALLOC(pSubplan, NULL); @@ -483,7 +495,7 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog pSubplan->msgType = pModif->msgType; } else { pSubplan->pNode = createPhysiNode(pCxt, pLogicSubplan->pNode); - // pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode); + pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode); } pSubplan->subplanType = pLogicSubplan->subplanType; return pSubplan; diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 2a6e7a9a55..4bdd9d21ee 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -109,6 +109,14 @@ private: cout << "sql:[" << cxt_.pSql << "] toString code:" << code << ", strerror:" << tstrerror(code) << endl; return string(); } + SNode* pNode; + code = nodesStringToNode(pStr, &pNode); + if (code != TSDB_CODE_SUCCESS) { + tfree(pStr); + cout << "sql:[" << cxt_.pSql << "] toObject code:" << code << ", strerror:" << tstrerror(code) << endl; + return string(); + } + nodesDestroyNode(pNode); string str(pStr); tfree(pStr); return str; @@ -151,8 +159,3 @@ TEST_F(PlannerTest, subquery) { bind("SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b"); ASSERT_TRUE(run()); } - -TEST_F(PlannerTest, createTable) { - bind("create table t1(ts timestamp, c1 int)"); - ASSERT_TRUE(run()); -} diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index a8649f1ea7..0cfaf4c96e 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -98,7 +98,6 @@ int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void SJson* pJobj = tjsonCreateObject(); if (NULL == pJobj || TSDB_CODE_SUCCESS != func(pObj, pJobj)) { - printf("%s:%d code = %d\n", __FUNCTION__, __LINE__, TSDB_CODE_FAILED); tjsonDelete(pJobj); return TSDB_CODE_FAILED; } @@ -159,9 +158,8 @@ int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal if (NULL == p) { return TSDB_CODE_FAILED; } - char* pEnd = NULL; - *pVal = strtol(p, &pEnd, 10); - return (NULL == pEnd ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED); + *pVal = strtol(p, NULL, 10); + return TSDB_CODE_SUCCESS; } int32_t tjsonGetIntValue(const SJson* pJson, const char* pName, int32_t* pVal) { @@ -190,9 +188,8 @@ int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pV if (NULL == p) { return TSDB_CODE_FAILED; } - char* pEnd = NULL; - *pVal = strtoul(p, &pEnd, 10); - return (NULL == pEnd ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED); + *pVal = strtoul(p, NULL, 10); + return TSDB_CODE_SUCCESS; } int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal) { @@ -204,7 +201,7 @@ int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pV int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal) { const SJson* pObject = tjsonGetObjectItem(pJson, pName); - if (cJSON_IsBool(pObject)) { + if (!cJSON_IsBool(pObject)) { return TSDB_CODE_FAILED; } *pVal = cJSON_IsTrue(pObject) ? true : false;