diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 12c7c7a37a..82ee3b51e4 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -18,10 +18,8 @@ #define NODES_MSG_DEFAULT_LEN 1024 -#define tlvDecodeEnum(pTlv, value) tlvDecodeImpl(pTlv, &(value), sizeof(value)) - #define tlvForEach(pDecoder, pTlv, code) \ - while (TSDB_CODE_SUCCESS == (code = tlvGetNextTlv(pDecoder, &pTlv)) && NULL != pTlv) + while (TSDB_CODE_SUCCESS == code && TSDB_CODE_SUCCESS == (code = tlvGetNextTlv(pDecoder, &pTlv)) && NULL != pTlv) typedef struct STlv { int16_t type; @@ -33,6 +31,7 @@ typedef struct STlvEncoder { int32_t allocSize; int32_t offset; char* pBuf; + int32_t tlvCount; } STlvEncoder; typedef struct STlvDecoder { @@ -56,6 +55,7 @@ static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj); static int32_t initTlvEncoder(STlvEncoder* pEncoder) { pEncoder->allocSize = NODES_MSG_DEFAULT_LEN; pEncoder->offset = 0; + pEncoder->tlvCount = 0; pEncoder->pBuf = taosMemoryMalloc(pEncoder->allocSize); return NULL == pEncoder->pBuf ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS; } @@ -66,6 +66,7 @@ static void endTlvEncode(STlvEncoder* pEncoder, char** pMsg, int32_t* pLen) { *pMsg = pEncoder->pBuf; pEncoder->pBuf = NULL; *pLen = pEncoder->offset; + // nodesWarn("encode tlv count = %d, tl size = %d", pEncoder->tlvCount, sizeof(STlv) * pEncoder->tlvCount); } static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pValue, int16_t len) { @@ -82,6 +83,7 @@ static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pV pTlv->len = len; memcpy(pTlv->value, pValue, len); pEncoder->offset += sizeof(STlv) + pTlv->len; + ++(pEncoder->tlvCount); return TSDB_CODE_SUCCESS; } @@ -97,6 +99,10 @@ static int32_t tlvEncodeI32(STlvEncoder* pEncoder, int16_t type, int32_t value) return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); } +static int32_t tlvEncodeI64(STlvEncoder* pEncoder, int16_t type, int64_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + static int32_t tlvEncodeU8(STlvEncoder* pEncoder, int16_t type, uint8_t value) { return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); } @@ -109,10 +115,38 @@ static int32_t tlvEncodeU64(STlvEncoder* pEncoder, int16_t type, uint64_t value) return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); } +static int32_t tlvEncodeDouble(STlvEncoder* pEncoder, int16_t type, double value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeEnum(STlvEncoder* pEncoder, int16_t type, int32_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeBool(STlvEncoder* pEncoder, int16_t type, bool value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pValue) { return tlvEncodeImpl(pEncoder, type, pValue, strlen(pValue)); } +static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, const void* pObj) { + if (NULL == pObj) { + return TSDB_CODE_SUCCESS; + } + + int32_t start = pEncoder->offset; + pEncoder->offset += sizeof(STlv); + int32_t code = func(pObj, pEncoder); + if (TSDB_CODE_SUCCESS == code) { + STlv* pTlv = (STlv*)(pEncoder->pBuf + start); + pTlv->type = type; + pTlv->len = pEncoder->offset - start - sizeof(STlv); + } + return code; +} + static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg func, const void* pArray, int32_t itemSize, int32_t num) { int32_t code = TSDB_CODE_SUCCESS; @@ -120,7 +154,7 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun int32_t start = pEncoder->offset; pEncoder->offset += sizeof(STlv); for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < num; ++i) { - code = func(pArray + i * itemSize, pEncoder); + code = tlvEncodeObj(pEncoder, 0, func, pArray + i * itemSize); } if (TSDB_CODE_SUCCESS == code) { STlv* pTlv = (STlv*)(pEncoder->pBuf + start); @@ -131,18 +165,6 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun return code; } -static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, const void* pObj) { - int32_t start = pEncoder->offset; - pEncoder->offset += sizeof(STlv); - int32_t code = func(pObj, pEncoder); - if (TSDB_CODE_SUCCESS == code) { - STlv* pTlv = (STlv*)(pEncoder->pBuf + start); - pTlv->type = type; - pTlv->len = pEncoder->offset - start - sizeof(STlv); - } - return code; -} - static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) { if (pDecoder->offset == pDecoder->bufSize) { *pTlv = NULL; @@ -153,7 +175,7 @@ static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) { if ((*pTlv)->len + pDecoder->offset > pDecoder->bufSize) { return TSDB_CODE_FAILED; } - pDecoder->offset += (*pTlv)->len; + pDecoder->offset += sizeof(STlv) + (*pTlv)->len; return TSDB_CODE_SUCCESS; } @@ -173,10 +195,37 @@ static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) { return tlvDecodeImpl( static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } +static int32_t tlvDecodeI64(STlv* pTlv, int64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + static int32_t tlvDecodeU8(STlv* pTlv, uint8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } +static int32_t tlvDecodeU16(STlv* pTlv, uint16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } +static int32_t tlvDecodeDouble(STlv* pTlv, double* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeBool(STlv* pTlv, bool* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeEnum(STlv* pTlv, void* pValue, int16_t len) { + int32_t value = 0; + memcpy(&value, pTlv->value, pTlv->len); + switch (len) { + case 1: + *(int8_t*)pValue = value; + break; + case 2: + *(int16_t*)pValue = value; + break; + case 4: + *(int32_t*)pValue = value; + break; + default: + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} + static int32_t tlvDecodeCStr(STlv* pTlv, char* pValue) { memcpy(pValue, pTlv->value, pTlv->len); return TSDB_CODE_SUCCESS; @@ -196,6 +245,19 @@ static int32_t tlvDecodeObj(STlvDecoder* pDecoder, FToObject func, void* pObj) { return code; } +static int32_t tlvDecodeObjArray(STlvDecoder* pDecoder, FToObject func, void* pArray, int32_t itemSize) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t i = 0; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { code = tlvDecodeObjFromTlv(pTlv, func, pArray + itemSize * i++); } + return code; +} + +static int32_t tlvDecodeObjArrayFromTlv(STlv* pTlv, FToObject func, void* pArray, int32_t itemSize) { + STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; + return tlvDecodeObjArray(&decoder, func, pArray, itemSize); +} + static int32_t tlvDecodeDynObjFromTlv(STlv* pTlv, FMakeObject makeFunc, FToObject toFunc, void** pObj) { *pObj = makeFunc(pTlv->type); if (NULL == *pObj) { @@ -213,8 +275,6 @@ static int32_t tlvDecodeDynObj(STlvDecoder* pDecoder, FMakeObject makeFunc, FToO return code; } -static void* makeNodeList(int16_t type) { return nodesMakeList(); } - enum { DATA_TYPE_CODE_TYPE = 1, DATA_TYPE_CODE_PRECISION, DATA_TYPE_CODE_SCALE, DATA_TYPE_CODE_BYTES }; static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -254,7 +314,6 @@ static int32_t msgToDataType(STlvDecoder* pDecoder, void* pObj) { code = tlvDecodeI32(pTlv, &pNode->bytes); break; default: - code = TSDB_CODE_FAILED; break; } } @@ -280,7 +339,6 @@ static int32_t msgToExprNode(STlvDecoder* pDecoder, void* pObj) { code = tlvDecodeObjFromTlv(pTlv, msgToDataType, &pNode->resType); break; default: - code = TSDB_CODE_FAILED; break; } } @@ -312,7 +370,7 @@ static int32_t columnNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { code = tlvEncodeI16(pEncoder, COLUMN_CODE_COLUMN_ID, pNode->colId); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI8(pEncoder, COLUMN_CODE_COLUMN_TYPE, pNode->colType); + code = tlvEncodeEnum(pEncoder, COLUMN_CODE_COLUMN_TYPE, pNode->colType); } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeI16(pEncoder, COLUMN_CODE_DATABLOCK_ID, pNode->dataBlockId); @@ -344,7 +402,7 @@ static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) { code = tlvDecodeI16(pTlv, &pNode->colId); break; case COLUMN_CODE_COLUMN_TYPE: - code = tlvDecodeEnum(pTlv, pNode->colType); + code = tlvDecodeEnum(pTlv, &pNode->colType, sizeof(pNode->colType)); break; case COLUMN_CODE_DATABLOCK_ID: code = tlvDecodeI16(pTlv, &pNode->dataBlockId); @@ -353,7 +411,605 @@ static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) { code = tlvDecodeI16(pTlv, &pNode->slotId); break; default: - code = TSDB_CODE_FAILED; + break; + } + } + + return code; +} + +enum { NAME_CODE_TYPE = 1, NAME_CODE_ACCT_ID, NAME_CODE_DB_NAME, NAME_CODE_TABLE_NAME }; + +static int32_t nameToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SName* pNode = (const SName*)pObj; + + int32_t code = tlvEncodeU8(pEncoder, NAME_CODE_TYPE, pNode->type); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, NAME_CODE_ACCT_ID, pNode->acctId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, NAME_CODE_DB_NAME, pNode->dbname); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, NAME_CODE_TABLE_NAME, pNode->tname); + } + + return code; +} + +static int32_t msgToName(STlvDecoder* pDecoder, void* pObj) { + SName* pNode = (SName*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case NAME_CODE_TYPE: + code = tlvDecodeU8(pTlv, &pNode->type); + break; + case NAME_CODE_ACCT_ID: + code = tlvDecodeI32(pTlv, &pNode->acctId); + break; + case NAME_CODE_DB_NAME: + code = tlvDecodeCStr(pTlv, pNode->dbname); + break; + case NAME_CODE_TABLE_NAME: + code = tlvDecodeCStr(pTlv, pNode->tname); + break; + default: + break; + } + } + + return code; +} + +enum { TIME_WINDOW_CODE_START_KEY = 1, TIME_WINDOW_CODE_END_KEY }; + +static int32_t timeWindowToMsg(const void* pObj, STlvEncoder* pEncoder) { + const STimeWindow* pNode = (const STimeWindow*)pObj; + + int32_t code = tlvEncodeI64(pEncoder, TIME_WINDOW_CODE_START_KEY, pNode->skey); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, TIME_WINDOW_CODE_END_KEY, pNode->ekey); + } + + return code; +} + +static int32_t msgToTimeWindow(STlvDecoder* pDecoder, void* pObj) { + STimeWindow* pNode = (STimeWindow*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case TIME_WINDOW_CODE_START_KEY: + code = tlvDecodeI64(pTlv, &pNode->skey); + break; + case TIME_WINDOW_CODE_END_KEY: + code = tlvDecodeI64(pTlv, &pNode->ekey); + break; + default: + break; + } + } + + return code; +} + +enum { NODE_LIST_CODE_DATA_TYPE = 1, NODE_LIST_CODE_NODE_LIST }; + +static int32_t nodeListNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SNodeListNode* pNode = (const SNodeListNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, NODE_LIST_CODE_DATA_TYPE, dataTypeToMsg, &pNode->dataType); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, NODE_LIST_CODE_NODE_LIST, nodeListToMsg, pNode->pNodeList); + } + + return code; +} + +static int32_t msgToNodeListNode(STlvDecoder* pDecoder, void* pObj) { + SNodeListNode* pNode = (SNodeListNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case NODE_LIST_CODE_DATA_TYPE: + code = tlvDecodeObjFromTlv(pTlv, msgToDataType, &pNode->dataType); + break; + case NODE_LIST_CODE_NODE_LIST: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pNodeList); + break; + default: + break; + } + } + + return code; +} + +enum { TARGET_CODE_DATA_BLOCK_ID = 1, TARGET_CODE_SLOT_ID, TARGET_CODE_EXPR }; + +static int32_t targetNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const STargetNode* pNode = (const STargetNode*)pObj; + + int32_t code = tlvEncodeI16(pEncoder, TARGET_CODE_DATA_BLOCK_ID, pNode->dataBlockId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI16(pEncoder, TARGET_CODE_SLOT_ID, pNode->slotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, TARGET_CODE_EXPR, nodeToMsg, pNode->pExpr); + } + + return code; +} + +static int32_t msgToTargetNode(STlvDecoder* pDecoder, void* pObj) { + STargetNode* pNode = (STargetNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case TARGET_CODE_DATA_BLOCK_ID: + code = tlvDecodeI16(pTlv, &pNode->dataBlockId); + break; + case TARGET_CODE_SLOT_ID: + code = tlvDecodeI16(pTlv, &pNode->slotId); + break; + case TARGET_CODE_EXPR: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pExpr); + break; + default: + break; + } + } + + return code; +} + +enum { + DATA_BLOCK_DESC_CODE_DATA_BLOCK_ID = 1, + DATA_BLOCK_DESC_CODE_SLOTS, + DATA_BLOCK_DESC_CODE_TOTAL_ROW_SIZE, + DATA_BLOCK_DESC_CODE_OUTPUT_ROW_SIZE, + DATA_BLOCK_DESC_CODE_PRECISION +}; + +static int32_t dataBlockDescNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SDataBlockDescNode* pNode = (const SDataBlockDescNode*)pObj; + + int32_t code = tlvEncodeI16(pEncoder, DATA_BLOCK_DESC_CODE_DATA_BLOCK_ID, pNode->dataBlockId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, DATA_BLOCK_DESC_CODE_SLOTS, nodeListToMsg, pNode->pSlots); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, DATA_BLOCK_DESC_CODE_TOTAL_ROW_SIZE, pNode->totalRowSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, DATA_BLOCK_DESC_CODE_OUTPUT_ROW_SIZE, pNode->outputRowSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU8(pEncoder, DATA_BLOCK_DESC_CODE_PRECISION, pNode->precision); + } + + return code; +} + +static int32_t msgToDataBlockDescNode(STlvDecoder* pDecoder, void* pObj) { + SDataBlockDescNode* pNode = (SDataBlockDescNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case DATA_BLOCK_DESC_CODE_DATA_BLOCK_ID: + code = tlvDecodeI16(pTlv, &pNode->dataBlockId); + break; + case DATA_BLOCK_DESC_CODE_SLOTS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSlots); + break; + case DATA_BLOCK_DESC_CODE_TOTAL_ROW_SIZE: + code = tlvDecodeI32(pTlv, &pNode->totalRowSize); + break; + case DATA_BLOCK_DESC_CODE_OUTPUT_ROW_SIZE: + code = tlvDecodeI32(pTlv, &pNode->outputRowSize); + break; + case DATA_BLOCK_DESC_CODE_PRECISION: + code = tlvDecodeU8(pTlv, &pNode->precision); + break; + default: + break; + } + } + + return code; +} + +enum { + SLOT_DESC_CODE_SLOT_ID = 1, + SLOT_DESC_CODE_DATA_TYPE, + SLOT_DESC_CODE_RESERVE, + SLOT_DESC_CODE_OUTPUT, + SLOT_DESC_CODE_TAG +}; + +static int32_t slotDescNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SSlotDescNode* pNode = (const SSlotDescNode*)pObj; + + int32_t code = tlvEncodeI16(pEncoder, SLOT_DESC_CODE_SLOT_ID, pNode->slotId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SLOT_DESC_CODE_DATA_TYPE, dataTypeToMsg, &pNode->dataType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, SLOT_DESC_CODE_RESERVE, pNode->reserve); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, SLOT_DESC_CODE_OUTPUT, pNode->output); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, SLOT_DESC_CODE_TAG, pNode->tag); + } + + return code; +} + +static int32_t msgToSlotDescNode(STlvDecoder* pDecoder, void* pObj) { + SSlotDescNode* pNode = (SSlotDescNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case SLOT_DESC_CODE_SLOT_ID: + code = tlvDecodeI16(pTlv, &pNode->slotId); + break; + case SLOT_DESC_CODE_DATA_TYPE: + code = tlvDecodeObjFromTlv(pTlv, msgToDataType, &pNode->dataType); + break; + case SLOT_DESC_CODE_RESERVE: + code = tlvDecodeBool(pTlv, &pNode->reserve); + break; + case SLOT_DESC_CODE_OUTPUT: + code = tlvDecodeBool(pTlv, &pNode->output); + break; + case SLOT_DESC_CODE_TAG: + code = tlvDecodeBool(pTlv, &pNode->tag); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_NODE_CODE_OUTPUT_DESC = 1, + PHY_NODE_CODE_CONDITIONS, + PHY_NODE_CODE_CHILDREN, + PHY_NODE_CODE_LIMIT, + PHY_NODE_CODE_SLIMIT +}; + +static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SPhysiNode* pNode = (const SPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_NODE_CODE_OUTPUT_DESC, nodeToMsg, pNode->pOutputDataBlockDesc); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_NODE_CODE_CONDITIONS, nodeToMsg, pNode->pConditions); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_NODE_CODE_CHILDREN, nodeListToMsg, pNode->pChildren); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_NODE_CODE_LIMIT, nodeToMsg, pNode->pLimit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_NODE_CODE_SLIMIT, nodeToMsg, pNode->pSlimit); + } + + return code; +} + +static int32_t msgToPhysiNode(STlvDecoder* pDecoder, void* pObj) { + SPhysiNode* pNode = (SPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_NODE_CODE_OUTPUT_DESC: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pOutputDataBlockDesc); + break; + case PHY_NODE_CODE_CONDITIONS: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pConditions); + break; + case PHY_NODE_CODE_CHILDREN: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pChildren); + break; + case PHY_NODE_CODE_LIMIT: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pLimit); + break; + case PHY_NODE_CODE_SLIMIT: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pSlimit); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_SCAN_CODE_BASE_NODE = 1, + PHY_SCAN_CODE_SCAN_COLS, + PHY_SCAN_CODE_SCAN_PSEUDO_COLS, + PHY_SCAN_CODE_BASE_UID, + PHY_SCAN_CODE_BASE_SUID, + PHY_SCAN_CODE_BASE_TABLE_TYPE, + PHY_SCAN_CODE_BASE_TABLE_NAME +}; + +static int32_t physiScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SScanPhysiNode* pNode = (const SScanPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_SCAN_COLS, nodeListToMsg, pNode->pScanCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_SCAN_PSEUDO_COLS, nodeListToMsg, pNode->pScanPseudoCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, PHY_SCAN_CODE_BASE_UID, pNode->uid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, PHY_SCAN_CODE_BASE_SUID, pNode->suid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_SCAN_CODE_BASE_TABLE_TYPE, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_BASE_TABLE_NAME, nameToMsg, &pNode->tableName); + } + + return code; +} + +static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) { + SScanPhysiNode* pNode = (SScanPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_SCAN_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_SCAN_CODE_SCAN_COLS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanCols); + break; + case PHY_SCAN_CODE_SCAN_PSEUDO_COLS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanPseudoCols); + break; + case PHY_SCAN_CODE_BASE_UID: + code = tlvDecodeU64(pTlv, &pNode->uid); + break; + case PHY_SCAN_CODE_BASE_SUID: + code = tlvDecodeU64(pTlv, &pNode->suid); + break; + case PHY_SCAN_CODE_BASE_TABLE_TYPE: + code = tlvDecodeI8(pTlv, &pNode->tableType); + break; + case PHY_SCAN_CODE_BASE_TABLE_NAME: + code = tlvDecodeObjFromTlv(pTlv, msgToName, &pNode->tableName); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_TABLE_SCAN_CODE_SCAN = 1, + PHY_TABLE_SCAN_CODE_SCAN_COUNT, + PHY_TABLE_SCAN_CODE_REVERSE_SCAN_COUNT, + PHY_TABLE_SCAN_CODE_SCAN_RANGE, + PHY_TABLE_SCAN_CODE_RATIO, + PHY_TABLE_SCAN_CODE_DATA_REQUIRED, + PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS, + PHY_TABLE_SCAN_CODE_GROUP_TAGS, + PHY_TABLE_SCAN_CODE_GROUP_SORT, + PHY_TABLE_SCAN_CODE_INTERVAL, + PHY_TABLE_SCAN_CODE_OFFSET, + PHY_TABLE_SCAN_CODE_SLIDING, + PHY_TABLE_SCAN_CODE_INTERVAL_UNIT, + PHY_TABLE_SCAN_CODE_SLIDING_UNIT, + PHY_TABLE_SCAN_CODE_TRIGGER_TYPE, + PHY_TABLE_SCAN_CODE_WATERMARK, + PHY_TABLE_SCAN_CODE_IG_EXPIRED, + PHY_TABLE_SCAN_CODE_ASSIGN_BLOCK_UID, +}; + +static int32_t physiTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_SCAN, physiScanNodeToMsg, &pNode->scan); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU8(pEncoder, PHY_TABLE_SCAN_CODE_SCAN_COUNT, pNode->scanSeq[0]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU8(pEncoder, PHY_TABLE_SCAN_CODE_REVERSE_SCAN_COUNT, pNode->scanSeq[1]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_SCAN_RANGE, timeWindowToMsg, &pNode->scanRange); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeDouble(pEncoder, PHY_TABLE_SCAN_CODE_RATIO, pNode->ratio); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_TABLE_SCAN_CODE_DATA_REQUIRED, pNode->dataRequired); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS, nodeListToMsg, pNode->pDynamicScanFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_GROUP_TAGS, nodeListToMsg, pNode->pGroupTags); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_TABLE_SCAN_CODE_GROUP_SORT, pNode->groupSort); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_INTERVAL, pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_OFFSET, pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_SLIDING, pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_INTERVAL_UNIT, pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_SLIDING_UNIT, pNode->slidingUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_TRIGGER_TYPE, pNode->triggerType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_WATERMARK, pNode->watermark); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_IG_EXPIRED, pNode->igExpired); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_TABLE_SCAN_CODE_ASSIGN_BLOCK_UID, pNode->assignBlockUid); + } + + return code; +} + +static int32_t msgToPhysiTableScanNode(STlvDecoder* pDecoder, void* pObj) { + STableScanPhysiNode* pNode = (STableScanPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_TABLE_SCAN_CODE_SCAN: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiScanNode, &pNode->scan); + break; + case PHY_TABLE_SCAN_CODE_SCAN_COUNT: + code = tlvDecodeU8(pTlv, pNode->scanSeq); + break; + case PHY_TABLE_SCAN_CODE_REVERSE_SCAN_COUNT: + code = tlvDecodeU8(pTlv, pNode->scanSeq + 1); + break; + case PHY_TABLE_SCAN_CODE_SCAN_RANGE: + code = tlvDecodeObjFromTlv(pTlv, msgToTimeWindow, &pNode->scanRange); + break; + case PHY_TABLE_SCAN_CODE_RATIO: + code = tlvDecodeDouble(pTlv, &pNode->ratio); + break; + case PHY_TABLE_SCAN_CODE_DATA_REQUIRED: + code = tlvDecodeI32(pTlv, &pNode->dataRequired); + break; + case PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pDynamicScanFuncs); + break; + case PHY_TABLE_SCAN_CODE_GROUP_TAGS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pGroupTags); + break; + case PHY_TABLE_SCAN_CODE_GROUP_SORT: + code = tlvDecodeBool(pTlv, &pNode->groupSort); + break; + case PHY_TABLE_SCAN_CODE_INTERVAL: + code = tlvDecodeI64(pTlv, &pNode->interval); + break; + case PHY_TABLE_SCAN_CODE_OFFSET: + code = tlvDecodeI64(pTlv, &pNode->offset); + break; + case PHY_TABLE_SCAN_CODE_SLIDING: + code = tlvDecodeI64(pTlv, &pNode->sliding); + break; + case PHY_TABLE_SCAN_CODE_INTERVAL_UNIT: + code = tlvDecodeI8(pTlv, &pNode->intervalUnit); + break; + case PHY_TABLE_SCAN_CODE_SLIDING_UNIT: + code = tlvDecodeI8(pTlv, &pNode->slidingUnit); + break; + case PHY_TABLE_SCAN_CODE_TRIGGER_TYPE: + code = tlvDecodeI8(pTlv, &pNode->triggerType); + break; + case PHY_TABLE_SCAN_CODE_WATERMARK: + code = tlvDecodeI64(pTlv, &pNode->watermark); + break; + case PHY_TABLE_SCAN_CODE_IG_EXPIRED: + code = tlvDecodeI8(pTlv, &pNode->igExpired); + break; + case PHY_TABLE_SCAN_CODE_ASSIGN_BLOCK_UID: + code = tlvDecodeBool(pTlv, &pNode->assignBlockUid); + break; + default: + break; + } + } + + return code; +} + +enum { PHY_DATA_SINK_CODE_INPUT_DESC = 1 }; + +static int32_t physicDataSinkNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SDataSinkNode* pNode = (const SDataSinkNode*)pObj; + return tlvEncodeObj(pEncoder, PHY_DATA_SINK_CODE_INPUT_DESC, nodeToMsg, pNode->pInputDataBlockDesc); +} + +static int32_t msgToPhysicDataSinkNode(STlvDecoder* pDecoder, void* pObj) { + SDataSinkNode* pNode = (SDataSinkNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_DATA_SINK_CODE_INPUT_DESC: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pInputDataBlockDesc); + break; + default: + break; + } + } + + return code; +} + +enum { PHY_DISPATCH_CODE_BASE_CODE = 1 }; + +static int32_t physiDispatchNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SDataDispatcherNode* pNode = (const SDataDispatcherNode*)pObj; + return tlvEncodeObj(pEncoder, PHY_DISPATCH_CODE_BASE_CODE, physicDataSinkNodeToMsg, &pNode->sink); +} + +static int32_t msgToPhysiDispatchNode(STlvDecoder* pDecoder, void* pObj) { + SDataDispatcherNode* pNode = (SDataDispatcherNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_DISPATCH_CODE_BASE_CODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysicDataSinkNode, &pNode->sink); + break; + default: break; } } @@ -377,6 +1033,30 @@ static int32_t subplanIdToMsg(const void* pObj, STlvEncoder* pEncoder) { return code; } +static int32_t msgToSubplanId(STlvDecoder* pDecoder, void* pObj) { + SSubplanId* pNode = (SSubplanId*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case SUBPLAN_ID_CODE_QUERY_ID: + code = tlvDecodeU64(pTlv, &pNode->queryId); + break; + case SUBPLAN_ID_CODE_GROUP_ID: + code = tlvDecodeI32(pTlv, &pNode->groupId); + break; + case SUBPLAN_ID_CODE_SUBPLAN_ID: + code = tlvDecodeI32(pTlv, &pNode->subplanId); + break; + default: + break; + } + } + + return code; +} + enum { EP_CODE_FQDN = 1, EP_CODE_port }; static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -390,6 +1070,27 @@ static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) { return code; } +static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) { + SEp* pNode = (SEp*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case EP_CODE_FQDN: + code = tlvDecodeCStr(pTlv, pNode->fqdn); + break; + case EP_CODE_port: + code = tlvDecodeU16(pTlv, &pNode->port); + break; + default: + break; + } + } + + return code; +} + enum { QUERY_NODE_ADDR_CODE_NODE_ID = 1, QUERY_NODE_ADDR_CODE_IN_USE, @@ -402,10 +1103,10 @@ static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) { int32_t code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_NODE_ID, pNode->nodeId); if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_IN_USE, pNode->epSet.inUse); + code = tlvEncodeI8(pEncoder, QUERY_NODE_ADDR_CODE_IN_USE, pNode->epSet.inUse); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_NUM_OF_EPS, pNode->epSet.numOfEps); + code = tlvEncodeI8(pEncoder, QUERY_NODE_ADDR_CODE_NUM_OF_EPS, pNode->epSet.numOfEps); } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObjArray(pEncoder, QUERY_NODE_ADDR_CODE_EPS, epToMsg, pNode->epSet.eps, sizeof(SEp), @@ -415,6 +1116,33 @@ static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) { return code; } +static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) { + SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case QUERY_NODE_ADDR_CODE_NODE_ID: + code = tlvDecodeI32(pTlv, &pNode->nodeId); + break; + case QUERY_NODE_ADDR_CODE_IN_USE: + code = tlvDecodeI8(pTlv, &pNode->epSet.inUse); + break; + case QUERY_NODE_ADDR_CODE_NUM_OF_EPS: + code = tlvDecodeI8(pTlv, &pNode->epSet.numOfEps); + break; + case QUERY_NODE_ADDR_CODE_EPS: + code = tlvDecodeObjArrayFromTlv(pTlv, msgToEp, pNode->epSet.eps, sizeof(SEp)); + break; + default: + break; + } + } + + return code; +} + enum { SUBPLAN_CODE_SUBPLAN_ID = 1, SUBPLAN_CODE_SUBPLAN_TYPE, @@ -434,7 +1162,7 @@ static int32_t subplanToMsg(const void* pObj, STlvEncoder* pEncoder) { int32_t code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_SUBPLAN_ID, subplanIdToMsg, &pNode->id); if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI8(pEncoder, SUBPLAN_CODE_SUBPLAN_TYPE, pNode->subplanType); + code = tlvEncodeEnum(pEncoder, SUBPLAN_CODE_SUBPLAN_TYPE, pNode->subplanType); } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeI32(pEncoder, SUBPLAN_CODE_MSG_TYPE, pNode->msgType); @@ -475,10 +1203,10 @@ static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) { tlvForEach(pDecoder, pTlv, code) { switch (pTlv->type) { case SUBPLAN_CODE_SUBPLAN_ID: - // code = tlvDecodeObjFromTlv(pTlv, msgToSubplanId, &pNode->id); + code = tlvDecodeObjFromTlv(pTlv, msgToSubplanId, &pNode->id); break; case SUBPLAN_CODE_SUBPLAN_TYPE: - code = tlvDecodeEnum(pTlv, pNode->subplanType); + code = tlvDecodeEnum(pTlv, &pNode->subplanType, sizeof(pNode->subplanType)); break; case SUBPLAN_CODE_MSG_TYPE: code = tlvDecodeI32(pTlv, &pNode->msgType); @@ -493,7 +1221,7 @@ static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) { code = tlvDecodeCStr(pTlv, pNode->user); break; case SUBPLAN_CODE_EXECNODE: - // code = tlvDecodeObjFromTlv(pTlv, msgToQueryNodeAddr, &pNode->execNode); + code = tlvDecodeObjFromTlv(pTlv, msgToQueryNodeAddr, &pNode->execNode); break; case SUBPLAN_CODE_ROOT_NODE: code = msgToNodeFromTlv(pTlv, (void**)&pNode->pNode); @@ -508,7 +1236,6 @@ static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) { code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagIndexCond); break; default: - code = TSDB_CODE_FAILED; break; } } @@ -549,7 +1276,6 @@ static int32_t msgToQueryPlan(STlvDecoder* pDecoder, void* pObj) { code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSubplans); break; default: - code = TSDB_CODE_FAILED; break; } } @@ -558,33 +1284,83 @@ static int32_t msgToQueryPlan(STlvDecoder* pDecoder, void* pObj) { } static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pObj)) { case QUERY_NODE_COLUMN: - return columnNodeToMsg(pObj, pEncoder); - case QUERY_NODE_PHYSICAL_SUBPLAN: - return subplanToMsg(pObj, pEncoder); - case QUERY_NODE_PHYSICAL_PLAN: - return queryPlanToMsg(pObj, pEncoder); - default: + code = columnNodeToMsg(pObj, pEncoder); break; - }; - nodesWarn("specificNodeToMsg unknown node = %s", nodesNodeName(nodeType(pObj))); - return TSDB_CODE_SUCCESS; + case QUERY_NODE_NODE_LIST: + code = nodeListNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_TARGET: + code = targetNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_DATABLOCK_DESC: + code = dataBlockDescNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_SLOT_DESC: + code = slotDescNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + code = physiTableScanNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: + code = physiDispatchNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_SUBPLAN: + code = subplanToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN: + code = queryPlanToMsg(pObj, pEncoder); + break; + default: + nodesWarn("specificNodeToMsg unknown node = %s", nodesNodeName(nodeType(pObj))); + break; + } + if (TSDB_CODE_SUCCESS != code) { + nodesError("specificNodeToMsg error node = %s", nodesNodeName(nodeType(pObj))); + } + return code; } static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { + int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pObj)) { case QUERY_NODE_COLUMN: - return msgToColumnNode(pDecoder, pObj); - case QUERY_NODE_PHYSICAL_SUBPLAN: - return msgToSubplan(pDecoder, pObj); - case QUERY_NODE_PHYSICAL_PLAN: - return msgToQueryPlan(pDecoder, pObj); - default: + code = msgToColumnNode(pDecoder, pObj); break; - }; - nodesWarn("msgToSpecificNode unknown node = %s", nodesNodeName(nodeType(pObj))); - return TSDB_CODE_SUCCESS; + case QUERY_NODE_NODE_LIST: + code = msgToNodeListNode(pDecoder, pObj); + break; + case QUERY_NODE_TARGET: + code = msgToTargetNode(pDecoder, pObj); + break; + case QUERY_NODE_DATABLOCK_DESC: + code = msgToDataBlockDescNode(pDecoder, pObj); + break; + case QUERY_NODE_SLOT_DESC: + code = msgToSlotDescNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + code = msgToPhysiTableScanNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: + code = msgToPhysiDispatchNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_SUBPLAN: + code = msgToSubplan(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN: + code = msgToQueryPlan(pDecoder, pObj); + break; + default: + nodesWarn("msgToSpecificNode unknown node = %s", nodesNodeName(nodeType(pObj))); + break; + } + if (TSDB_CODE_SUCCESS != code) { + nodesError("msgToSpecificNode error node = %s", nodesNodeName(nodeType(pObj))); + } + return code; } static int32_t nodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -614,8 +1390,8 @@ static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder) { return TSDB_CODE_SUCCESS; } -static int32_t msgToNodeListImpl(STlvDecoder* pDecoder, void* pObj) { - SNodeList* pList = (SNodeList*)pObj; +static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj) { + SNodeList* pList = nodesMakeList(); int32_t code = TSDB_CODE_SUCCESS; while (TSDB_CODE_SUCCESS == code && !tlvDecodeEnd(pDecoder)) { @@ -625,13 +1401,14 @@ static int32_t msgToNodeListImpl(STlvDecoder* pDecoder, void* pObj) { code = nodesListAppend(pList, pNode); } } + if (TSDB_CODE_SUCCESS == code) { + *pObj = pList; + } else { + nodesDestroyList(pList); + } return code; } -static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj) { - return tlvDecodeDynObj(pDecoder, makeNodeList, msgToNodeListImpl, pObj); -} - static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj) { STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; return msgToNodeList(&decoder, pObj); diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index 5824f042d0..4763a7ff15 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -19,6 +19,7 @@ #include #include +#include #include "cmdnodes.h" #include "mockCatalogService.h" @@ -323,7 +324,6 @@ class PlannerTestBaseImpl { if (DUMP_MODULE_ALL == module || DUMP_MODULE_PHYSICAL == module) { cout << "+++++++++++++++++++++physical plan : " << endl; cout << res_.physiPlan_ << endl; - cout << "json len: " << res_.physiPlan_.length() << ", msg len: " << res_.physiPlanMsg_.length() << endl; } if (DUMP_MODULE_ALL == module || DUMP_MODULE_SUBPLAN == module) { @@ -412,6 +412,7 @@ class PlannerTestBaseImpl { FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); } } res_.physiPlanMsg_ = toMsg((SNode*)(*pPlan)); + cout << "json len: " << res_.physiPlan_.length() << ", msg len: " << res_.physiPlanMsg_.length() << endl; } void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) { @@ -450,7 +451,12 @@ class PlannerTestBaseImpl { string toString(const SNode* pRoot) { char* pStr = NULL; int32_t len = 0; + + // auto start = chrono::steady_clock::now(); DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len) + // cout << "nodesNodeToString: " + // << chrono::duration_cast(chrono::steady_clock::now() - start).count() << endl; + string str(pStr); taosMemoryFreeClear(pStr); return str; @@ -459,7 +465,24 @@ class PlannerTestBaseImpl { string toMsg(const SNode* pRoot) { char* pStr = NULL; int32_t len = 0; + + // auto start = chrono::steady_clock::now(); DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len) + // cout << "nodesNodeToMsg: " + // << chrono::duration_cast(chrono::steady_clock::now() - start).count() << endl; + + SNode* pNode = NULL; + char* pNewStr = NULL; + int32_t newlen = 0; + DO_WITH_THROW(nodesMsgToNode, pStr, len, &pNode) + DO_WITH_THROW(nodesNodeToMsg, pNode, &pNewStr, &newlen) + if (newlen != len || 0 != memcmp(pStr, pNewStr, len)) { + cout << "nodesNodeToMsg error!!!!!!!!!!!!!! len = " << len << ", newlen = " << newlen << endl; + DO_WITH_THROW(nodesNodeToString, pNode, false, &pNewStr, &newlen) + cout << "nodesNodeToString " << pNewStr << endl; + } + taosMemoryFreeClear(pNewStr); + string str(pStr, len); taosMemoryFreeClear(pStr); return str;