diff --git a/APP_Framework/Applications/app_test/test_mqttclient/Command.png b/APP_Framework/Applications/app_test/test_mqttclient/Command.png new file mode 100644 index 000000000..8394a2fa3 Binary files /dev/null and b/APP_Framework/Applications/app_test/test_mqttclient/Command.png differ diff --git a/APP_Framework/Applications/app_test/test_mqttclient/Compile.png b/APP_Framework/Applications/app_test/test_mqttclient/Compile.png new file mode 100644 index 000000000..4a0d7472c Binary files /dev/null and b/APP_Framework/Applications/app_test/test_mqttclient/Compile.png differ diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTConnect.h b/APP_Framework/Applications/app_test/test_mqttclient/MQTTConnect.h new file mode 100644 index 000000000..3fdee8e1f --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTConnect.h @@ -0,0 +1,137 @@ +/******************************************************************************* + * Copyright (c) 2014, 2017 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Xiang Rong - 442039 Add makefile to Embedded C client + * Ian Craggs - fix for issue #64, bit order in connack response + *******************************************************************************/ + +#ifndef MQTTCONNECT_H_ +#define MQTTCONNECT_H_ + +#if !defined(DLLImport) + #define DLLImport +#endif +#if !defined(DLLExport) + #define DLLExport +#endif + + +typedef union +{ + unsigned char all; /**< all connect flags */ +#if defined(REVERSED) + struct + { + unsigned int username : 1; /**< 3.1 user name */ + unsigned int password : 1; /**< 3.1 password */ + unsigned int willRetain : 1; /**< will retain setting */ + unsigned int willQoS : 2; /**< will QoS value */ + unsigned int will : 1; /**< will flag */ + unsigned int cleansession : 1; /**< clean session flag */ + unsigned int : 1; /**< unused */ + } bits; +#else + struct + { + unsigned int : 1; /**< unused */ + unsigned int cleansession : 1; /**< cleansession flag */ + unsigned int will : 1; /**< will flag */ + unsigned int willQoS : 2; /**< will QoS value */ + unsigned int willRetain : 1; /**< will retain setting */ + unsigned int password : 1; /**< 3.1 password */ + unsigned int username : 1; /**< 3.1 user name */ + } bits; +#endif +} MQTTConnectFlags; /**< connect flags byte */ + + + +/** + * Defines the MQTT "Last Will and Testament" (LWT) settings for + * the connect packet. + */ +typedef struct +{ + /** The eyecatcher for this structure. must be MQTW. */ + char struct_id[4]; + /** The version number of this structure. Must be 0 */ + int struct_version; + /** The LWT topic to which the LWT message will be published. */ + MQTTString topicName; + /** The LWT payload. */ + MQTTString message; + /** + * The retained flag for the LWT message (see MQTTAsync_message.retained). + */ + unsigned char retained; + /** + * The quality of service setting for the LWT message (see + * MQTTAsync_message.qos and @ref qos). + */ + char qos; +} MQTTPacket_willOptions; + + +#define MQTTPacket_willOptions_initializer { {'M', 'Q', 'T', 'W'}, 0, {NULL, {0, NULL}}, {NULL, {0, NULL}}, 0, 0 } + + +typedef struct +{ + /** The eyecatcher for this structure. must be MQTC. */ + char struct_id[4]; + /** The version number of this structure. Must be 0 */ + int struct_version; + /** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1 + */ + unsigned char MQTTVersion; + MQTTString clientID; + unsigned short keepAliveInterval; + unsigned char cleansession; + unsigned char willFlag; + MQTTPacket_willOptions will; + MQTTString username; + MQTTString password; +} MQTTPacket_connectData; + +typedef union +{ + unsigned char all; /**< all connack flags */ +#if defined(REVERSED) + struct + { + unsigned int reserved : 7; /**< unused */ + unsigned int sessionpresent : 1; /**< session present flag */ + } bits; +#else + struct + { + unsigned int sessionpresent : 1; /**< session present flag */ + unsigned int reserved: 7; /**< unused */ + } bits; +#endif +} MQTTConnackFlags; /**< connack flags byte */ + +#define MQTTPacket_connectData_initializer { {'M', 'Q', 'T', 'C'}, 0, 4, {NULL, {0, NULL}}, 60, 1, 0, \ + MQTTPacket_willOptions_initializer, {NULL, {0, NULL}}, {NULL, {0, NULL}} } + +DLLExport int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options); +DLLExport int MQTTDeserialize_connect(MQTTPacket_connectData* data, unsigned char* buf, int len); + +DLLExport int MQTTSerialize_connack(unsigned char* buf, int buflen, unsigned char connack_rc, unsigned char sessionPresent); +DLLExport int MQTTDeserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int buflen); + +DLLExport int MQTTSerialize_disconnect(unsigned char* buf, int buflen); +DLLExport int MQTTSerialize_pingreq(unsigned char* buf, int buflen); + +#endif /* MQTTCONNECT_H_ */ diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTConnectClient.c b/APP_Framework/Applications/app_test/test_mqttclient/MQTTConnectClient.c new file mode 100644 index 000000000..5d5b954d5 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTConnectClient.c @@ -0,0 +1,214 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTPacket.h" +#include "StackTrace.h" + +#include + +/** + * Determines the length of the MQTT connect packet that would be produced using the supplied connect options. + * @param options the options to be used to build the connect packet + * @return the length of buffer needed to contain the serialized version of the packet + */ +int MQTTSerialize_connectLength(MQTTPacket_connectData* options) +{ + int len = 0; + + FUNC_ENTRY; + + if (options->MQTTVersion == 3) + len = 12; /* variable depending on MQTT or MQIsdp */ + else if (options->MQTTVersion == 4) + len = 10; + + len += MQTTstrlen(options->clientID)+2; + if (options->willFlag) + len += MQTTstrlen(options->will.topicName)+2 + MQTTstrlen(options->will.message)+2; + if (options->username.cstring || options->username.lenstring.data) + len += MQTTstrlen(options->username)+2; + if (options->password.cstring || options->password.lenstring.data) + len += MQTTstrlen(options->password)+2; + + FUNC_EXIT_RC(len); + return len; +} + + +/** + * Serializes the connect options into the buffer. + * @param buf the buffer into which the packet will be serialized + * @param len the length in bytes of the supplied buffer + * @param options the options to be used to build the connect packet + * @return serialized length, or error if 0 + */ +int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options) +{ + unsigned char *ptr = buf; + MQTTHeader header = {0}; + MQTTConnectFlags flags = {0}; + int len = 0; + int rc = -1; + + FUNC_ENTRY; + if (MQTTPacket_len(len = MQTTSerialize_connectLength(options)) > buflen) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.byte = 0; + header.bits.type = CONNECT; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, len); /* write remaining length */ + + if (options->MQTTVersion == 4) + { + writeCString(&ptr, "MQTT"); + writeChar(&ptr, (char) 4); + } + else + { + writeCString(&ptr, "MQIsdp"); + writeChar(&ptr, (char) 3); + } + + flags.all = 0; + flags.bits.cleansession = options->cleansession; + flags.bits.will = (options->willFlag) ? 1 : 0; + if (flags.bits.will) + { + flags.bits.willQoS = options->will.qos; + flags.bits.willRetain = options->will.retained; + } + + if (options->username.cstring || options->username.lenstring.data) + flags.bits.username = 1; + if (options->password.cstring || options->password.lenstring.data) + flags.bits.password = 1; + + writeChar(&ptr, flags.all); + writeInt(&ptr, options->keepAliveInterval); + writeMQTTString(&ptr, options->clientID); + if (options->willFlag) + { + writeMQTTString(&ptr, options->will.topicName); + writeMQTTString(&ptr, options->will.message); + } + if (flags.bits.username) + writeMQTTString(&ptr, options->username); + if (flags.bits.password) + writeMQTTString(&ptr, options->password); + + rc = ptr - buf; + + exit: FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Deserializes the supplied (wire) buffer into connack data - return code + * @param sessionPresent the session present flag returned (only for MQTT 3.1.1) + * @param connack_rc returned integer value of the connack return code + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param len the length in bytes of the data in the supplied buffer + * @return error code. 1 is success, 0 is failure + */ +int MQTTDeserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int buflen) +{ + MQTTHeader header = {0}; + unsigned char* curdata = buf; + unsigned char* enddata = NULL; + int rc = 0; + int mylen; + MQTTConnackFlags flags = {0}; + + FUNC_ENTRY; + header.byte = readChar(&curdata); + if (header.bits.type != CONNACK) + goto exit; + + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + if (enddata - curdata < 2) + goto exit; + + flags.all = readChar(&curdata); + *sessionPresent = flags.bits.sessionpresent; + *connack_rc = readChar(&curdata); + + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes a 0-length packet into the supplied buffer, ready for writing to a socket + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer, to avoid overruns + * @param packettype the message type + * @return serialized length, or error if 0 + */ +int MQTTSerialize_zero(unsigned char* buf, int buflen, unsigned char packettype) +{ + MQTTHeader header = {0}; + int rc = -1; + unsigned char *ptr = buf; + + FUNC_ENTRY; + if (buflen < 2) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + header.byte = 0; + header.bits.type = packettype; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, 0); /* write remaining length */ + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes a disconnect packet into the supplied buffer, ready for writing to a socket + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer, to avoid overruns + * @return serialized length, or error if 0 + */ +int MQTTSerialize_disconnect(unsigned char* buf, int buflen) +{ + return MQTTSerialize_zero(buf, buflen, DISCONNECT); +} + + +/** + * Serializes a disconnect packet into the supplied buffer, ready for writing to a socket + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer, to avoid overruns + * @return serialized length, or error if 0 + */ +int MQTTSerialize_pingreq(unsigned char* buf, int buflen) +{ + return MQTTSerialize_zero(buf, buflen, PINGREQ); +} diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTConnectServer.c b/APP_Framework/Applications/app_test/test_mqttclient/MQTTConnectServer.c new file mode 100644 index 000000000..7c1fe506e --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTConnectServer.c @@ -0,0 +1,148 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "StackTrace.h" +#include "MQTTPacket.h" +#include + +#define min(a, b) ((a < b) ? a : b) + + +/** + * Validates MQTT protocol name and version combinations + * @param protocol the MQTT protocol name as an MQTTString + * @param version the MQTT protocol version number, as in the connect packet + * @return correct MQTT combination? 1 is true, 0 is false + */ +int MQTTPacket_checkVersion(MQTTString* protocol, int version) +{ + int rc = 0; + + if (version == 3 && memcmp(protocol->lenstring.data, "MQIsdp", + min(6, protocol->lenstring.len)) == 0) + rc = 1; + else if (version == 4 && memcmp(protocol->lenstring.data, "MQTT", + min(4, protocol->lenstring.len)) == 0) + rc = 1; + return rc; +} + + +/** + * Deserializes the supplied (wire) buffer into connect data structure + * @param data the connect data structure to be filled out + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param len the length in bytes of the data in the supplied buffer + * @return error code. 1 is success, 0 is failure + */ +int MQTTDeserialize_connect(MQTTPacket_connectData* data, unsigned char* buf, int len) +{ + MQTTHeader header = {0}; + MQTTConnectFlags flags = {0}; + unsigned char* curdata = buf; + unsigned char* enddata = &buf[len]; + int rc = 0; + MQTTString Protocol; + int version; + int mylen = 0; + + FUNC_ENTRY; + header.byte = readChar(&curdata); + if (header.bits.type != CONNECT) + goto exit; + + curdata += MQTTPacket_decodeBuf(curdata, &mylen); /* read remaining length */ + + if (!readMQTTLenString(&Protocol, &curdata, enddata) || + enddata - curdata < 0) /* do we have enough data to read the protocol version byte? */ + goto exit; + + version = (int)readChar(&curdata); /* Protocol version */ + /* If we don't recognize the protocol version, we don't parse the connect packet on the + * basis that we don't know what the format will be. + */ + if (MQTTPacket_checkVersion(&Protocol, version)) + { + flags.all = readChar(&curdata); + data->cleansession = flags.bits.cleansession; + data->keepAliveInterval = readInt(&curdata); + if (!readMQTTLenString(&data->clientID, &curdata, enddata)) + goto exit; + data->willFlag = flags.bits.will; + if (flags.bits.will) + { + data->will.qos = flags.bits.willQoS; + data->will.retained = flags.bits.willRetain; + if (!readMQTTLenString(&data->will.topicName, &curdata, enddata) || + !readMQTTLenString(&data->will.message, &curdata, enddata)) + goto exit; + } + if (flags.bits.username) + { + if (enddata - curdata < 3 || !readMQTTLenString(&data->username, &curdata, enddata)) + goto exit; /* username flag set, but no username supplied - invalid */ + if (flags.bits.password && + (enddata - curdata < 3 || !readMQTTLenString(&data->password, &curdata, enddata))) + goto exit; /* password flag set, but no password supplied - invalid */ + } + else if (flags.bits.password) + goto exit; /* password flag set without username - invalid */ + rc = 1; + } +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes the connack packet into the supplied buffer. + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param connack_rc the integer connack return code to be used + * @param sessionPresent the MQTT 3.1.1 sessionPresent flag + * @return serialized length, or error if 0 + */ +int MQTTSerialize_connack(unsigned char* buf, int buflen, unsigned char connack_rc, unsigned char sessionPresent) +{ + MQTTHeader header = {0}; + int rc = 0; + unsigned char *ptr = buf; + MQTTConnackFlags flags = {0}; + + FUNC_ENTRY; + if (buflen < 2) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + header.byte = 0; + header.bits.type = CONNACK; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */ + + flags.all = 0; + flags.bits.sessionpresent = sessionPresent; + writeChar(&ptr, flags.all); + writeChar(&ptr, connack_rc); + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTDeserializePublish.c b/APP_Framework/Applications/app_test/test_mqttclient/MQTTDeserializePublish.c new file mode 100644 index 000000000..dbc9e5d8a --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTDeserializePublish.c @@ -0,0 +1,107 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "StackTrace.h" +#include "MQTTPacket.h" +#include + +#define min(a, b) ((a < b) ? 1 : 0) + +/** + * Deserializes the supplied (wire) buffer into publish data + * @param dup returned integer - the MQTT dup flag + * @param qos returned integer - the MQTT QoS value + * @param retained returned integer - the MQTT retained flag + * @param packetid returned integer - the MQTT packet identifier + * @param topicName returned MQTTString - the MQTT topic in the publish + * @param payload returned byte buffer - the MQTT publish payload + * @param payloadlen returned integer - the length of the MQTT payload + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return error code. 1 is success + */ +int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName, + unsigned char** payload, int* payloadlen, unsigned char* buf, int buflen) +{ + MQTTHeader header = {0}; + unsigned char* curdata = buf; + unsigned char* enddata = NULL; + int rc = 0; + int mylen = 0; + + FUNC_ENTRY; + header.byte = readChar(&curdata); + if (header.bits.type != PUBLISH) + goto exit; + *dup = header.bits.dup; + *qos = header.bits.qos; + *retained = header.bits.retain; + + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + + if (!readMQTTLenString(topicName, &curdata, enddata) || + enddata - curdata < 0) /* do we have enough data to read the protocol version byte? */ + goto exit; + + if (*qos > 0) + *packetid = readInt(&curdata); + + *payloadlen = enddata - curdata; + *payload = curdata; + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + + +/** + * Deserializes the supplied (wire) buffer into an ack + * @param packettype returned integer - the MQTT packet type + * @param dup returned integer - the MQTT dup flag + * @param packetid returned integer - the MQTT packet identifier + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return error code. 1 is success, 0 is failure + */ +int MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen) +{ + MQTTHeader header = {0}; + unsigned char* curdata = buf; + unsigned char* enddata = NULL; + int rc = 0; + int mylen; + + FUNC_ENTRY; + header.byte = readChar(&curdata); + *dup = header.bits.dup; + *packettype = header.bits.type; + + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + + if (enddata - curdata < 2) + goto exit; + *packetid = readInt(&curdata); + + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTFormat.c b/APP_Framework/Applications/app_test/test_mqttclient/MQTTFormat.c new file mode 100644 index 000000000..9f6317725 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTFormat.c @@ -0,0 +1,262 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "StackTrace.h" +#include "MQTTPacket.h" + +#include + + +const char* MQTTPacket_names[] = +{ + "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", + "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", + "PINGREQ", "PINGRESP", "DISCONNECT" +}; + + +const char* MQTTPacket_getName(unsigned short packetid) +{ + return MQTTPacket_names[packetid]; +} + + +int MQTTStringFormat_connect(char* strbuf, int strbuflen, MQTTPacket_connectData* data) +{ + int strindex = 0; + + strindex = snprintf(strbuf, strbuflen, + "CONNECT MQTT version %d, client id %.*s, clean session %d, keep alive %d", + (int)data->MQTTVersion, data->clientID.lenstring.len, data->clientID.lenstring.data, + (int)data->cleansession, data->keepAliveInterval); + if (data->willFlag) + strindex += snprintf(&strbuf[strindex], strbuflen - strindex, + ", will QoS %d, will retain %d, will topic %.*s, will message %.*s", + data->will.qos, data->will.retained, + data->will.topicName.lenstring.len, data->will.topicName.lenstring.data, + data->will.message.lenstring.len, data->will.message.lenstring.data); + if (data->username.lenstring.data && data->username.lenstring.len > 0) + strindex += snprintf(&strbuf[strindex], strbuflen - strindex, + ", user name %.*s", data->username.lenstring.len, data->username.lenstring.data); + if (data->password.lenstring.data && data->password.lenstring.len > 0) + strindex += snprintf(&strbuf[strindex], strbuflen - strindex, + ", password %.*s", data->password.lenstring.len, data->password.lenstring.data); + return strindex; +} + + +int MQTTStringFormat_connack(char* strbuf, int strbuflen, unsigned char connack_rc, unsigned char sessionPresent) +{ + int strindex = snprintf(strbuf, strbuflen, "CONNACK session present %d, rc %d", sessionPresent, connack_rc); + return strindex; +} + + +int MQTTStringFormat_publish(char* strbuf, int strbuflen, unsigned char dup, int qos, unsigned char retained, + unsigned short packetid, MQTTString topicName, unsigned char* payload, int payloadlen) +{ + int strindex = snprintf(strbuf, strbuflen, + "PUBLISH dup %d, QoS %d, retained %d, packet id %d, topic %.*s, payload length %d, payload %.*s", + dup, qos, retained, packetid, + (topicName.lenstring.len < 20) ? topicName.lenstring.len : 20, topicName.lenstring.data, + payloadlen, (payloadlen < 20) ? payloadlen : 20, payload); + return strindex; +} + + +int MQTTStringFormat_ack(char* strbuf, int strbuflen, unsigned char packettype, unsigned char dup, unsigned short packetid) +{ + int strindex = snprintf(strbuf, strbuflen, "%s, packet id %d", MQTTPacket_names[packettype], packetid); + if (dup) + strindex += snprintf(strbuf + strindex, strbuflen - strindex, ", dup %d", dup); + return strindex; +} + + +int MQTTStringFormat_subscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid, int count, + MQTTString topicFilters[], int requestedQoSs[]) +{ + return snprintf(strbuf, strbuflen, + "SUBSCRIBE dup %d, packet id %d count %d topic %.*s qos %d", + dup, packetid, count, + topicFilters[0].lenstring.len, topicFilters[0].lenstring.data, + requestedQoSs[0]); +} + + +int MQTTStringFormat_suback(char* strbuf, int strbuflen, unsigned short packetid, int count, int* grantedQoSs) +{ + return snprintf(strbuf, strbuflen, + "SUBACK packet id %d count %d granted qos %d", packetid, count, grantedQoSs[0]); +} + + +int MQTTStringFormat_unsubscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid, + int count, MQTTString topicFilters[]) +{ + return snprintf(strbuf, strbuflen, + "UNSUBSCRIBE dup %d, packet id %d count %d topic %.*s", + dup, packetid, count, + topicFilters[0].lenstring.len, topicFilters[0].lenstring.data); +} + + +#if defined(MQTT_CLIENT) +char* MQTTFormat_toClientString(char* strbuf, int strbuflen, unsigned char* buf, int buflen) +{ + int index = 0; + int rem_length = 0; + MQTTHeader header = {0}; + int strindex = 0; + + header.byte = buf[index++]; + index += MQTTPacket_decodeBuf(&buf[index], &rem_length); + + switch (header.bits.type) + { + + case CONNACK: + { + unsigned char sessionPresent, connack_rc; + if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) == 1) + strindex = MQTTStringFormat_connack(strbuf, strbuflen, connack_rc, sessionPresent); + } + break; + case PUBLISH: + { + unsigned char dup, retained, *payload; + unsigned short packetid; + int qos, payloadlen; + MQTTString topicName = MQTTString_initializer; + if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, + &payload, &payloadlen, buf, buflen) == 1) + strindex = MQTTStringFormat_publish(strbuf, strbuflen, dup, qos, retained, packetid, + topicName, payload, payloadlen); + } + break; + case PUBACK: + case PUBREC: + case PUBREL: + case PUBCOMP: + { + unsigned char packettype, dup; + unsigned short packetid; + if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1) + strindex = MQTTStringFormat_ack(strbuf, strbuflen, packettype, dup, packetid); + } + break; + case SUBACK: + { + unsigned short packetid; + int maxcount = 1, count = 0; + int grantedQoSs[1]; + if (MQTTDeserialize_suback(&packetid, maxcount, &count, grantedQoSs, buf, buflen) == 1) + strindex = MQTTStringFormat_suback(strbuf, strbuflen, packetid, count, grantedQoSs); + } + break; + case UNSUBACK: + { + unsigned short packetid; + if (MQTTDeserialize_unsuback(&packetid, buf, buflen) == 1) + strindex = MQTTStringFormat_ack(strbuf, strbuflen, UNSUBACK, 0, packetid); + } + break; + case PINGREQ: + case PINGRESP: + case DISCONNECT: + strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]); + break; + } + return strbuf; +} +#endif + +#if defined(MQTT_SERVER) +char* MQTTFormat_toServerString(char* strbuf, int strbuflen, unsigned char* buf, int buflen) +{ + int index = 0; + int rem_length = 0; + MQTTHeader header = {0}; + int strindex = 0; + + header.byte = buf[index++]; + index += MQTTPacket_decodeBuf(&buf[index], &rem_length); + + switch (header.bits.type) + { + case CONNECT: + { + MQTTPacket_connectData data; + int rc; + if ((rc = MQTTDeserialize_connect(&data, buf, buflen)) == 1) + strindex = MQTTStringFormat_connect(strbuf, strbuflen, &data); + } + break; + case PUBLISH: + { + unsigned char dup, retained, *payload; + unsigned short packetid; + int qos, payloadlen; + MQTTString topicName = MQTTString_initializer; + if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, + &payload, &payloadlen, buf, buflen) == 1) + strindex = MQTTStringFormat_publish(strbuf, strbuflen, dup, qos, retained, packetid, + topicName, payload, payloadlen); + } + break; + case PUBACK: + case PUBREC: + case PUBREL: + case PUBCOMP: + { + unsigned char packettype, dup; + unsigned short packetid; + if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1) + strindex = MQTTStringFormat_ack(strbuf, strbuflen, packettype, dup, packetid); + } + break; + case SUBSCRIBE: + { + unsigned char dup; + unsigned short packetid; + int maxcount = 1, count = 0; + MQTTString topicFilters[1]; + int requestedQoSs[1]; + if (MQTTDeserialize_subscribe(&dup, &packetid, maxcount, &count, + topicFilters, requestedQoSs, buf, buflen) == 1) + strindex = MQTTStringFormat_subscribe(strbuf, strbuflen, dup, packetid, count, topicFilters, requestedQoSs);; + } + break; + case UNSUBSCRIBE: + { + unsigned char dup; + unsigned short packetid; + int maxcount = 1, count = 0; + MQTTString topicFilters[1]; + if (MQTTDeserialize_unsubscribe(&dup, &packetid, maxcount, &count, topicFilters, buf, buflen) == 1) + strindex = MQTTStringFormat_unsubscribe(strbuf, strbuflen, dup, packetid, count, topicFilters); + } + break; + case PINGREQ: + case PINGRESP: + case DISCONNECT: + strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]); + break; + } + strbuf[strbuflen] = '\0'; + return strbuf; +} +#endif diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTFormat.h b/APP_Framework/Applications/app_test/test_mqttclient/MQTTFormat.h new file mode 100644 index 000000000..f7bd0d165 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTFormat.h @@ -0,0 +1,37 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#if !defined(MQTTFORMAT_H) +#define MQTTFORMAT_H + +#include "StackTrace.h" +#include "MQTTPacket.h" + +const char* MQTTPacket_getName(unsigned short packetid); +int MQTTStringFormat_connect(char* strbuf, int strbuflen, MQTTPacket_connectData* data); +int MQTTStringFormat_connack(char* strbuf, int strbuflen, unsigned char connack_rc, unsigned char sessionPresent); +int MQTTStringFormat_publish(char* strbuf, int strbuflen, unsigned char dup, int qos, unsigned char retained, + unsigned short packetid, MQTTString topicName, unsigned char* payload, int payloadlen); +int MQTTStringFormat_ack(char* strbuf, int strbuflen, unsigned char packettype, unsigned char dup, unsigned short packetid); +int MQTTStringFormat_subscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid, int count, + MQTTString topicFilters[], int requestedQoSs[]); +int MQTTStringFormat_suback(char* strbuf, int strbuflen, unsigned short packetid, int count, int* grantedQoSs); +int MQTTStringFormat_unsubscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid, + int count, MQTTString topicFilters[]); +char* MQTTFormat_toClientString(char* strbuf, int strbuflen, unsigned char* buf, int buflen); +char* MQTTFormat_toServerString(char* strbuf, int strbuflen, unsigned char* buf, int buflen); + +#endif diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTPacket.c b/APP_Framework/Applications/app_test/test_mqttclient/MQTTPacket.c new file mode 100644 index 000000000..9c3f958bb --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTPacket.c @@ -0,0 +1,416 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Sergio R. Caprile - non-blocking packet read functions for stream transport + *******************************************************************************/ + +#include "StackTrace.h" +#include "MQTTPacket.h" + +#include + +/** + * Encodes the message length according to the MQTT algorithm + * @param buf the buffer into which the encoded data is written + * @param length the length to be encoded + * @return the number of bytes written to buffer + */ +int MQTTPacket_encode(unsigned char* buf, int length) +{ + int rc = 0; + + FUNC_ENTRY; + do + { + char d = length % 128; + length /= 128; + /* if there are more digits to encode, set the top bit of this digit */ + if (length > 0) + d |= 0x80; + buf[rc++] = d; + } while (length > 0); + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Decodes the message length according to the MQTT algorithm + * @param getcharfn pointer to function to read the next character from the data source + * @param value the decoded length returned + * @return the number of bytes read from the socket + */ +int MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int* value) +{ + unsigned char c; + int multiplier = 1; + int len = 0; +#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 + + FUNC_ENTRY; + *value = 0; + do + { + int rc = MQTTPACKET_READ_ERROR; + + if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) + { + rc = MQTTPACKET_READ_ERROR; /* bad data */ + goto exit; + } + rc = (*getcharfn)(&c, 1); + if (rc != 1) + goto exit; + *value += (c & 127) * multiplier; + multiplier *= 128; + } while ((c & 128) != 0); +exit: + FUNC_EXIT_RC(len); + return len; +} + + +int MQTTPacket_len(int rem_len) +{ + rem_len += 1; /* header byte */ + + /* now remaining_length field */ + if (rem_len < 128) + rem_len += 1; + else if (rem_len < 16384) + rem_len += 2; + else if (rem_len < 2097151) + rem_len += 3; + else + rem_len += 4; + return rem_len; +} + + +static unsigned char* bufptr; + +int bufchar(unsigned char* c, int count) +{ + int i; + + for (i = 0; i < count; ++i) + *c = *bufptr++; + return count; +} + + +int MQTTPacket_decodeBuf(unsigned char* buf, int* value) +{ + bufptr = buf; + return MQTTPacket_decode(bufchar, value); +} + + +/** + * Calculates an integer from two bytes read from the input buffer + * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned + * @return the integer value calculated + */ +int readInt(unsigned char** pptr) +{ + unsigned char* ptr = *pptr; + int len = 256*(*ptr) + (*(ptr+1)); + *pptr += 2; + return len; +} + + +/** + * Reads one character from the input buffer. + * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned + * @return the character read + */ +char readChar(unsigned char** pptr) +{ + char c = **pptr; + (*pptr)++; + return c; +} + + +/** + * Writes one character to an output buffer. + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param c the character to write + */ +void writeChar(unsigned char** pptr, char c) +{ + **pptr = c; + (*pptr)++; +} + + +/** + * Writes an integer as 2 bytes to an output buffer. + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param anInt the integer to write + */ +void writeInt(unsigned char** pptr, int anInt) +{ + **pptr = (unsigned char)(anInt / 256); + (*pptr)++; + **pptr = (unsigned char)(anInt % 256); + (*pptr)++; +} + + +/** + * Writes a "UTF" string to an output buffer. Converts C string to length-delimited. + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param string the C string to write + */ +void writeCString(unsigned char** pptr, const char* string) +{ + int len = strlen(string); + writeInt(pptr, len); + memcpy(*pptr, string, len); + *pptr += len; +} + + +int getLenStringLen(char* ptr) +{ + int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1)); + return len; +} + + +void writeMQTTString(unsigned char** pptr, MQTTString mqttstring) +{ + if (mqttstring.lenstring.len > 0) + { + writeInt(pptr, mqttstring.lenstring.len); + memcpy(*pptr, mqttstring.lenstring.data, mqttstring.lenstring.len); + *pptr += mqttstring.lenstring.len; + } + else if (mqttstring.cstring) + writeCString(pptr, mqttstring.cstring); + else + writeInt(pptr, 0); +} + + +/** + * @param mqttstring the MQTTString structure into which the data is to be read + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param enddata pointer to the end of the data: do not read beyond + * @return 1 if successful, 0 if not + */ +int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned char* enddata) +{ + int rc = 0; + + FUNC_ENTRY; + /* the first two bytes are the length of the string */ + if (enddata - (*pptr) > 1) /* enough length to read the integer? */ + { + mqttstring->lenstring.len = readInt(pptr); /* increments pptr to point past length */ + if (&(*pptr)[mqttstring->lenstring.len] <= enddata) + { + mqttstring->lenstring.data = (char*)*pptr; + *pptr += mqttstring->lenstring.len; + rc = 1; + } + } + mqttstring->cstring = NULL; + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Return the length of the MQTTstring - C string if there is one, otherwise the length delimited string + * @param mqttstring the string to return the length of + * @return the length of the string + */ +int MQTTstrlen(MQTTString mqttstring) +{ + int rc = 0; + + if (mqttstring.cstring) + rc = strlen(mqttstring.cstring); + else + rc = mqttstring.lenstring.len; + return rc; +} + + +/** + * Compares an MQTTString to a C string + * @param a the MQTTString to compare + * @param bptr the C string to compare + * @return boolean - equal or not + */ +int MQTTPacket_equals(MQTTString* a, char* bptr) +{ + int alen = 0, + blen = 0; + char *aptr; + + if (a->cstring) + { + aptr = a->cstring; + alen = strlen(a->cstring); + } + else + { + aptr = a->lenstring.data; + alen = a->lenstring.len; + } + blen = strlen(bptr); + + return (alen == blen) && (strncmp(aptr, bptr, alen) == 0); +} + + +/** + * Helper function to read packet data from some source into a buffer + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param getfn pointer to a function which will read any number of bytes from the needed source + * @return integer MQTT packet type, or -1 on error + * @note the whole message must fit into the caller's buffer + */ +int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int)) +{ + int rc = -1; + MQTTHeader header = {0}; + int len = 0; + int rem_len = 0; + + /* 1. read the header byte. This has the packet type in it */ + if ((*getfn)(buf, 1) != 1) + goto exit; + + len = 1; + /* 2. read the remaining length. This is variable in itself */ + MQTTPacket_decode(getfn, &rem_len); + len += MQTTPacket_encode(buf + 1, rem_len); /* put the original remaining length back into the buffer */ + + /* 3. read the rest of the buffer using a callback to supply the rest of the data */ + if((rem_len + len) > buflen){ + printf("buflen小于接收报文需要的长度"); + goto exit; + } + if (rem_len && ((*getfn)(buf + len, rem_len) != rem_len)){ + printf("读取的消息长度和负载长度不一致"); + goto exit; + } + + header.byte = buf[0]; + rc = header.bits.type; +exit: + return rc; +} + +/** + * Decodes the message length according to the MQTT algorithm, non-blocking + * @param trp pointer to a transport structure holding what is needed to solve getting data from it + * @param value the decoded length returned + * @return integer the number of bytes read from the socket, 0 for call again, or -1 on error + */ +static int MQTTPacket_decodenb(MQTTTransport *trp) +{ + unsigned char c; + int rc = MQTTPACKET_READ_ERROR; + + FUNC_ENTRY; + if(trp->len == 0){ /* initialize on first call */ + trp->multiplier = 1; + trp->rem_len = 0; + } + do { + int frc; + if (trp->len >= MAX_NO_OF_REMAINING_LENGTH_BYTES) + goto exit; + if ((frc=(*trp->getfn)(trp->sck, &c, 1)) == -1) + goto exit; + if (frc == 0){ + rc = 0; + goto exit; + } + ++(trp->len); + trp->rem_len += (c & 127) * trp->multiplier; + trp->multiplier *= 128; + } while ((c & 128) != 0); + rc = trp->len; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + +/** + * Helper function to read packet data from some source into a buffer, non-blocking + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param trp pointer to a transport structure holding what is needed to solve getting data from it + * @return integer MQTT packet type, 0 for call again, or -1 on error + * @note the whole message must fit into the caller's buffer + */ +int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp) +{ + int rc = -1, frc; + MQTTHeader header = {0}; + + switch(trp->state){ + default: + trp->state = 0; + /*FALLTHROUGH*/ + case 0: + /* read the header byte. This has the packet type in it */ + if ((frc=(*trp->getfn)(trp->sck, buf, 1)) == -1) + goto exit; + if (frc == 0) + return 0; + trp->len = 0; + ++trp->state; + /*FALLTHROUGH*/ + /* read the remaining length. This is variable in itself */ + case 1: + if((frc=MQTTPacket_decodenb(trp)) == MQTTPACKET_READ_ERROR) + goto exit; + if(frc == 0) + return 0; + trp->len = 1 + MQTTPacket_encode(buf + 1, trp->rem_len); /* put the original remaining length back into the buffer */ + if((trp->rem_len + trp->len) > buflen) + goto exit; + ++trp->state; + /*FALLTHROUGH*/ + case 2: + if(trp->rem_len){ + /* read the rest of the buffer using a callback to supply the rest of the data */ + if ((frc=(*trp->getfn)(trp->sck, buf + trp->len, trp->rem_len)) == -1) + goto exit; + if (frc == 0) + return 0; + trp->rem_len -= frc; + trp->len += frc; + if(trp->rem_len) + return 0; + } + header.byte = buf[0]; + rc = header.bits.type; + break; + } + +exit: + trp->state = 0; + return rc; +} + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTPacket.h b/APP_Framework/Applications/app_test/test_mqttclient/MQTTPacket.h new file mode 100644 index 000000000..df6f15513 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTPacket.h @@ -0,0 +1,133 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Xiang Rong - 442039 Add makefile to Embedded C client + *******************************************************************************/ + +#ifndef MQTTPACKET_H_ +#define MQTTPACKET_H_ + +#if defined(__cplusplus) /* If this is a C++ compiler, use C linkage */ +extern "C" { +#endif + +#if defined(WIN32_DLL) || defined(WIN64_DLL) + #define DLLImport __declspec(dllimport) + #define DLLExport __declspec(dllexport) +#elif defined(LINUX_SO) + #define DLLImport extern + #define DLLExport __attribute__ ((visibility ("default"))) +#else + #define DLLImport + #define DLLExport +#endif + +enum errors +{ + MQTTPACKET_BUFFER_TOO_SHORT = -2, + MQTTPACKET_READ_ERROR = -1, + MQTTPACKET_READ_COMPLETE +}; + +enum msgTypes +{ + CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, + PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, + PINGREQ, PINGRESP, DISCONNECT +}; + +/** + * Bitfields for the MQTT header byte. + */ +typedef union +{ + unsigned char byte; /**< the whole byte */ +#if defined(REVERSED) + struct + { + unsigned int type : 4; /**< message type nibble */ + unsigned int dup : 1; /**< DUP flag bit */ + unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */ + unsigned int retain : 1; /**< retained flag bit */ + } bits; +#else + struct + { + unsigned int retain : 1; /**< retained flag bit */ + unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */ + unsigned int dup : 1; /**< DUP flag bit */ + unsigned int type : 4; /**< message type nibble */ + } bits; +#endif +} MQTTHeader; + +typedef struct +{ + int len; + char* data; +} MQTTLenString; + +typedef struct +{ + char* cstring; + MQTTLenString lenstring; +} MQTTString; + +#define MQTTString_initializer {NULL, {0, NULL}} + +int MQTTstrlen(MQTTString mqttstring); + +#include "MQTTConnect.h" +#include "MQTTPublish.h" +#include "MQTTSubscribe.h" +#include "MQTTUnsubscribe.h" +#include "MQTTFormat.h" + +DLLExport int MQTTSerialize_ack(unsigned char* buf, int buflen, unsigned char type, unsigned char dup, unsigned short packetid); +DLLExport int MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen); + +int MQTTPacket_len(int rem_len); +DLLExport int MQTTPacket_equals(MQTTString* a, char* b); + +DLLExport int MQTTPacket_encode(unsigned char* buf, int length); +int MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int* value); +int MQTTPacket_decodeBuf(unsigned char* buf, int* value); + +int readInt(unsigned char** pptr); +char readChar(unsigned char** pptr); +void writeChar(unsigned char** pptr, char c); +void writeInt(unsigned char** pptr, int anInt); +int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned char* enddata); +void writeCString(unsigned char** pptr, const char* string); +void writeMQTTString(unsigned char** pptr, MQTTString mqttstring); + +DLLExport int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int)); + +typedef struct { + int (*getfn)(void *, unsigned char*, int); /* must return -1 for error, 0 for call again, or the number of bytes read */ + void *sck; /* pointer to whatever the system may use to identify the transport */ + int multiplier; + int rem_len; + int len; + char state; +}MQTTTransport; + +int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp); + +#ifdef __cplusplus /* If this is a C++ compiler, use C linkage */ +} +#endif + + +#endif /* MQTTPACKET_H_ */ diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTPublish.h b/APP_Framework/Applications/app_test/test_mqttclient/MQTTPublish.h new file mode 100644 index 000000000..d62dddb85 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTPublish.h @@ -0,0 +1,38 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Xiang Rong - 442039 Add makefile to Embedded C client + *******************************************************************************/ + +#ifndef MQTTPUBLISH_H_ +#define MQTTPUBLISH_H_ + +#if !defined(DLLImport) + #define DLLImport +#endif +#if !defined(DLLExport) + #define DLLExport +#endif + +DLLExport int MQTTSerialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid, + MQTTString topicName, unsigned char* payload, int payloadlen); + +DLLExport int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName, + unsigned char** payload, int* payloadlen, unsigned char* buf, int len); + +DLLExport int MQTTSerialize_puback(unsigned char* buf, int buflen, unsigned short packetid); +DLLExport int MQTTSerialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid); +DLLExport int MQTTSerialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid); + +#endif /* MQTTPUBLISH_H_ */ diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTSerializePublish.c b/APP_Framework/Applications/app_test/test_mqttclient/MQTTSerializePublish.c new file mode 100644 index 000000000..236791f42 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTSerializePublish.c @@ -0,0 +1,169 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs - fix for https://bugs.eclipse.org/bugs/show_bug.cgi?id=453144 + *******************************************************************************/ + +#include "MQTTPacket.h" +#include "StackTrace.h" + +#include + + +/** + * Determines the length of the MQTT publish packet that would be produced using the supplied parameters + * @param qos the MQTT QoS of the publish (packetid is omitted for QoS 0) + * @param topicName the topic name to be used in the publish + * @param payloadlen the length of the payload to be sent + * @return the length of buffer needed to contain the serialized version of the packet + */ +int MQTTSerialize_publishLength(int qos, MQTTString topicName, int payloadlen) +{ + int len = 0; + + len += 2 + MQTTstrlen(topicName) + payloadlen; + if (qos > 0) + len += 2; /* packetid */ + return len; +} + + +/** + * Serializes the supplied publish data into the supplied buffer, ready for sending + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param dup integer - the MQTT dup flag + * @param qos integer - the MQTT QoS value + * @param retained integer - the MQTT retained flag + * @param packetid integer - the MQTT packet identifier + * @param topicName MQTTString - the MQTT topic in the publish + * @param payload byte buffer - the MQTT publish payload + * @param payloadlen integer - the length of the MQTT payload + * @return the length of the serialized data. <= 0 indicates error + */ +int MQTTSerialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid, + MQTTString topicName, unsigned char* payload, int payloadlen) +{ + unsigned char *ptr = buf; + MQTTHeader header = {0}; + int rem_len = 0; + int rc = 0; + + FUNC_ENTRY; + if (MQTTPacket_len(rem_len = MQTTSerialize_publishLength(qos, topicName, payloadlen)) > buflen) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.bits.type = PUBLISH; + header.bits.dup = dup; + header.bits.qos = qos; + header.bits.retain = retained; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; + + writeMQTTString(&ptr, topicName); + + if (qos > 0) + writeInt(&ptr, packetid); + + memcpy(ptr, payload, payloadlen); + ptr += payloadlen; + + rc = ptr - buf; + +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + + +/** + * Serializes the ack packet into the supplied buffer. + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param type the MQTT packet type + * @param dup the MQTT dup flag + * @param packetid the MQTT packet identifier + * @return serialized length, or error if 0 + */ +int MQTTSerialize_ack(unsigned char* buf, int buflen, unsigned char packettype, unsigned char dup, unsigned short packetid) +{ + MQTTHeader header = {0}; + int rc = 0; + unsigned char *ptr = buf; + + FUNC_ENTRY; + if (buflen < 4) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + header.bits.type = packettype; + header.bits.dup = dup; + header.bits.qos = (packettype == PUBREL) ? 1 : 0; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */ + writeInt(&ptr, packetid); + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes a puback packet into the supplied buffer. + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param packetid integer - the MQTT packet identifier + * @return serialized length, or error if 0 + */ +int MQTTSerialize_puback(unsigned char* buf, int buflen, unsigned short packetid) +{ + return MQTTSerialize_ack(buf, buflen, PUBACK, 0, packetid); +} + + +/** + * Serializes a pubrel packet into the supplied buffer. + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param dup integer - the MQTT dup flag + * @param packetid integer - the MQTT packet identifier + * @return serialized length, or error if 0 + */ +int MQTTSerialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid) +{ + return MQTTSerialize_ack(buf, buflen, PUBREL, dup, packetid); +} + + +/** + * Serializes a pubrel packet into the supplied buffer. + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param packetid integer - the MQTT packet identifier + * @return serialized length, or error if 0 + */ +int MQTTSerialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid) +{ + return MQTTSerialize_ack(buf, buflen, PUBCOMP, 0, packetid); +} + + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTSubscribe.h b/APP_Framework/Applications/app_test/test_mqttclient/MQTTSubscribe.h new file mode 100644 index 000000000..383ca0d2f --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTSubscribe.h @@ -0,0 +1,39 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Xiang Rong - 442039 Add makefile to Embedded C client + *******************************************************************************/ + +#ifndef MQTTSUBSCRIBE_H_ +#define MQTTSUBSCRIBE_H_ + +#if !defined(DLLImport) + #define DLLImport +#endif +#if !defined(DLLExport) + #define DLLExport +#endif + +DLLExport int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, + int count, MQTTString topicFilters[], int requestedQoSs[]); + +DLLExport int MQTTDeserialize_subscribe(unsigned char* dup, unsigned short* packetid, + int maxcount, int* count, MQTTString topicFilters[], int requestedQoSs[], unsigned char* buf, int len); + +DLLExport int MQTTSerialize_suback(unsigned char* buf, int buflen, unsigned short packetid, int count, int* grantedQoSs); + +DLLExport int MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int len); + + +#endif /* MQTTSUBSCRIBE_H_ */ diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTSubscribeClient.c b/APP_Framework/Applications/app_test/test_mqttclient/MQTTSubscribeClient.c new file mode 100644 index 000000000..5b1ca2866 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTSubscribeClient.c @@ -0,0 +1,137 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTPacket.h" +#include "StackTrace.h" + +#include + +/** + * Determines the length of the MQTT subscribe packet that would be produced using the supplied parameters + * @param count the number of topic filter strings in topicFilters + * @param topicFilters the array of topic filter strings to be used in the publish + * @return the length of buffer needed to contain the serialized version of the packet + */ +int MQTTSerialize_subscribeLength(int count, MQTTString topicFilters[]) +{ + int i; + int len = 2; /* packetid */ + + for (i = 0; i < count; ++i) + len += 2 + MQTTstrlen(topicFilters[i]) + 1; /* length + topic + req_qos */ + return len; +} + + +/** + * Serializes the supplied subscribe data into the supplied buffer, ready for sending + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied bufferr + * @param dup integer - the MQTT dup flag + * @param packetid integer - the MQTT packet identifier + * @param count - number of members in the topicFilters and reqQos arrays + * @param topicFilters - array of topic filter names + * @param requestedQoSs - array of requested QoS + * @return the length of the serialized data. <= 0 indicates error + */ +int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, int count, + MQTTString topicFilters[], int requestedQoSs[]) +{ + unsigned char *ptr = buf; + MQTTHeader header = {0}; + int rem_len = 0; + int rc = 0; + int i = 0; + + FUNC_ENTRY; + if (MQTTPacket_len(rem_len = MQTTSerialize_subscribeLength(count, topicFilters)) > buflen) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.byte = 0; + header.bits.type = SUBSCRIBE; + header.bits.dup = dup; + header.bits.qos = 1; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; + + writeInt(&ptr, packetid); + + for (i = 0; i < count; ++i) + { + writeMQTTString(&ptr, topicFilters[i]); + writeChar(&ptr, requestedQoSs[i]); + } + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + + +/** + * Deserializes the supplied (wire) buffer into suback data + * @param packetid returned integer - the MQTT packet identifier + * @param maxcount - the maximum number of members allowed in the grantedQoSs array + * @param count returned integer - number of members in the grantedQoSs array + * @param grantedQoSs returned array of integers - the granted qualities of service + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return error code. 1 is success, 0 is failure + */ +int MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int buflen) +{ + MQTTHeader header = {0}; + unsigned char* curdata = buf; + unsigned char* enddata = NULL; + int rc = 0; + int mylen; + + FUNC_ENTRY; + header.byte = readChar(&curdata); + if (header.bits.type != SUBACK) + goto exit; + + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + if (enddata - curdata < 2) + goto exit; + + *packetid = readInt(&curdata); + + *count = 0; + while (curdata < enddata) + { + if (*count > maxcount) + { + rc = -1; + goto exit; + } + grantedQoSs[(*count)++] = readChar(&curdata); + } + + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTSubscribeServer.c b/APP_Framework/Applications/app_test/test_mqttclient/MQTTSubscribeServer.c new file mode 100644 index 000000000..15bb15daf --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTSubscribeServer.c @@ -0,0 +1,112 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTPacket.h" +#include "StackTrace.h" + +#include + + +/** + * Deserializes the supplied (wire) buffer into subscribe data + * @param dup integer returned - the MQTT dup flag + * @param packetid integer returned - the MQTT packet identifier + * @param maxcount - the maximum number of members allowed in the topicFilters and requestedQoSs arrays + * @param count - number of members in the topicFilters and requestedQoSs arrays + * @param topicFilters - array of topic filter names + * @param requestedQoSs - array of requested QoS + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return the length of the serialized data. <= 0 indicates error + */ +int MQTTDeserialize_subscribe(unsigned char* dup, unsigned short* packetid, int maxcount, int* count, MQTTString topicFilters[], + int requestedQoSs[], unsigned char* buf, int buflen) +{ + MQTTHeader header = {0}; + unsigned char* curdata = buf; + unsigned char* enddata = NULL; + int rc = -1; + int mylen = 0; + + FUNC_ENTRY; + header.byte = readChar(&curdata); + if (header.bits.type != SUBSCRIBE) + goto exit; + *dup = header.bits.dup; + + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + + *packetid = readInt(&curdata); + + *count = 0; + while (curdata < enddata) + { + if (!readMQTTLenString(&topicFilters[*count], &curdata, enddata)) + goto exit; + if (curdata >= enddata) /* do we have enough data to read the req_qos version byte? */ + goto exit; + requestedQoSs[*count] = readChar(&curdata); + (*count)++; + } + + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes the supplied suback data into the supplied buffer, ready for sending + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param packetid integer - the MQTT packet identifier + * @param count - number of members in the grantedQoSs array + * @param grantedQoSs - array of granted QoS + * @return the length of the serialized data. <= 0 indicates error + */ +int MQTTSerialize_suback(unsigned char* buf, int buflen, unsigned short packetid, int count, int* grantedQoSs) +{ + MQTTHeader header = {0}; + int rc = -1; + unsigned char *ptr = buf; + int i; + + FUNC_ENTRY; + if (buflen < 2 + count) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + header.byte = 0; + header.bits.type = SUBACK; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, 2 + count); /* write remaining length */ + + writeInt(&ptr, packetid); + + for (i = 0; i < count; ++i) + writeChar(&ptr, grantedQoSs[i]); + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTUnsubscribe.h b/APP_Framework/Applications/app_test/test_mqttclient/MQTTUnsubscribe.h new file mode 100644 index 000000000..1644ae518 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTUnsubscribe.h @@ -0,0 +1,38 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Xiang Rong - 442039 Add makefile to Embedded C client + *******************************************************************************/ + +#ifndef MQTTUNSUBSCRIBE_H_ +#define MQTTUNSUBSCRIBE_H_ + +#if !defined(DLLImport) + #define DLLImport +#endif +#if !defined(DLLExport) + #define DLLExport +#endif + +DLLExport int MQTTSerialize_unsubscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, + int count, MQTTString topicFilters[]); + +DLLExport int MQTTDeserialize_unsubscribe(unsigned char* dup, unsigned short* packetid, int max_count, int* count, MQTTString topicFilters[], + unsigned char* buf, int len); + +DLLExport int MQTTSerialize_unsuback(unsigned char* buf, int buflen, unsigned short packetid); + +DLLExport int MQTTDeserialize_unsuback(unsigned short* packetid, unsigned char* buf, int len); + +#endif /* MQTTUNSUBSCRIBE_H_ */ diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTUnsubscribeClient.c b/APP_Framework/Applications/app_test/test_mqttclient/MQTTUnsubscribeClient.c new file mode 100644 index 000000000..0f8db7b69 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTUnsubscribeClient.c @@ -0,0 +1,106 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTPacket.h" +#include "StackTrace.h" + +#include + +/** + * Determines the length of the MQTT unsubscribe packet that would be produced using the supplied parameters + * @param count the number of topic filter strings in topicFilters + * @param topicFilters the array of topic filter strings to be used in the publish + * @return the length of buffer needed to contain the serialized version of the packet + */ +int MQTTSerialize_unsubscribeLength(int count, MQTTString topicFilters[]) +{ + int i; + int len = 2; /* packetid */ + + for (i = 0; i < count; ++i) + len += 2 + MQTTstrlen(topicFilters[i]); /* length + topic*/ + return len; +} + + +/** + * Serializes the supplied unsubscribe data into the supplied buffer, ready for sending + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @param dup integer - the MQTT dup flag + * @param packetid integer - the MQTT packet identifier + * @param count - number of members in the topicFilters array + * @param topicFilters - array of topic filter names + * @return the length of the serialized data. <= 0 indicates error + */ +int MQTTSerialize_unsubscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, + int count, MQTTString topicFilters[]) +{ + unsigned char *ptr = buf; + MQTTHeader header = {0}; + int rem_len = 0; + int rc = -1; + int i = 0; + + FUNC_ENTRY; + if (MQTTPacket_len(rem_len = MQTTSerialize_unsubscribeLength(count, topicFilters)) > buflen) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.byte = 0; + header.bits.type = UNSUBSCRIBE; + header.bits.dup = dup; + header.bits.qos = 1; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; + + writeInt(&ptr, packetid); + + for (i = 0; i < count; ++i) + writeMQTTString(&ptr, topicFilters[i]); + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Deserializes the supplied (wire) buffer into unsuback data + * @param packetid returned integer - the MQTT packet identifier + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return error code. 1 is success, 0 is failure + */ +int MQTTDeserialize_unsuback(unsigned short* packetid, unsigned char* buf, int buflen) +{ + unsigned char type = 0; + unsigned char dup = 0; + int rc = 0; + + FUNC_ENTRY; + rc = MQTTDeserialize_ack(&type, &dup, packetid, buf, buflen); + if (type == UNSUBACK) + rc = 1; + FUNC_EXIT_RC(rc); + return rc; +} + + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/MQTTUnsubscribeServer.c b/APP_Framework/Applications/app_test/test_mqttclient/MQTTUnsubscribeServer.c new file mode 100644 index 000000000..b0e427d57 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/MQTTUnsubscribeServer.c @@ -0,0 +1,102 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTPacket.h" +#include "StackTrace.h" + +#include + + +/** + * Deserializes the supplied (wire) buffer into unsubscribe data + * @param dup integer returned - the MQTT dup flag + * @param packetid integer returned - the MQTT packet identifier + * @param maxcount - the maximum number of members allowed in the topicFilters and requestedQoSs arrays + * @param count - number of members in the topicFilters and requestedQoSs arrays + * @param topicFilters - array of topic filter names + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return the length of the serialized data. <= 0 indicates error + */ +int MQTTDeserialize_unsubscribe(unsigned char* dup, unsigned short* packetid, int maxcount, int* count, MQTTString topicFilters[], + unsigned char* buf, int len) +{ + MQTTHeader header = {0}; + unsigned char* curdata = buf; + unsigned char* enddata = NULL; + int rc = 0; + int mylen = 0; + + FUNC_ENTRY; + header.byte = readChar(&curdata); + if (header.bits.type != UNSUBSCRIBE) + goto exit; + *dup = header.bits.dup; + + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + + *packetid = readInt(&curdata); + + *count = 0; + while (curdata < enddata) + { + if (!readMQTTLenString(&topicFilters[*count], &curdata, enddata)) + goto exit; + (*count)++; + } + + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes the supplied unsuback data into the supplied buffer, ready for sending + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param packetid integer - the MQTT packet identifier + * @return the length of the serialized data. <= 0 indicates error + */ +int MQTTSerialize_unsuback(unsigned char* buf, int buflen, unsigned short packetid) +{ + MQTTHeader header = {0}; + int rc = 0; + unsigned char *ptr = buf; + + FUNC_ENTRY; + if (buflen < 2) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + header.byte = 0; + header.bits.type = UNSUBACK; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */ + + writeInt(&ptr, packetid); + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/Result.png b/APP_Framework/Applications/app_test/test_mqttclient/Result.png new file mode 100644 index 000000000..e765f0751 Binary files /dev/null and b/APP_Framework/Applications/app_test/test_mqttclient/Result.png differ diff --git a/APP_Framework/Applications/app_test/test_mqttclient/StackTrace.h b/APP_Framework/Applications/app_test/test_mqttclient/StackTrace.h new file mode 100644 index 000000000..c65a2ef43 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/StackTrace.h @@ -0,0 +1,78 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs - fix for bug #434081 + *******************************************************************************/ + +#ifndef STACKTRACE_H_ +#define STACKTRACE_H_ + +#include +#define NOSTACKTRACE 1 + +#if defined(NOSTACKTRACE) +#define FUNC_ENTRY +#define FUNC_ENTRY_NOLOG +#define FUNC_ENTRY_MED +#define FUNC_ENTRY_MAX +#define FUNC_EXIT +#define FUNC_EXIT_NOLOG +#define FUNC_EXIT_MED +#define FUNC_EXIT_MAX +#define FUNC_EXIT_RC(x) +#define FUNC_EXIT_MED_RC(x) +#define FUNC_EXIT_MAX_RC(x) + +#else + +#if defined(WIN32) +#define inline __inline +#define FUNC_ENTRY StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MINIMUM) +#define FUNC_ENTRY_NOLOG StackTrace_entry(__FUNCTION__, __LINE__, -1) +#define FUNC_ENTRY_MED StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MEDIUM) +#define FUNC_ENTRY_MAX StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MAXIMUM) +#define FUNC_EXIT StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MINIMUM) +#define FUNC_EXIT_NOLOG StackTrace_exit(__FUNCTION__, __LINE__, -1) +#define FUNC_EXIT_MED StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MEDIUM) +#define FUNC_EXIT_MAX StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MAXIMUM) +#define FUNC_EXIT_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MINIMUM) +#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MEDIUM) +#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MAXIMUM) +#else +#define FUNC_ENTRY StackTrace_entry(__func__, __LINE__, TRACE_MINIMUM) +#define FUNC_ENTRY_NOLOG StackTrace_entry(__func__, __LINE__, -1) +#define FUNC_ENTRY_MED StackTrace_entry(__func__, __LINE__, TRACE_MEDIUM) +#define FUNC_ENTRY_MAX StackTrace_entry(__func__, __LINE__, TRACE_MAXIMUM) +#define FUNC_EXIT StackTrace_exit(__func__, __LINE__, NULL, TRACE_MINIMUM) +#define FUNC_EXIT_NOLOG StackTrace_exit(__func__, __LINE__, NULL, -1) +#define FUNC_EXIT_MED StackTrace_exit(__func__, __LINE__, NULL, TRACE_MEDIUM) +#define FUNC_EXIT_MAX StackTrace_exit(__func__, __LINE__, NULL, TRACE_MAXIMUM) +#define FUNC_EXIT_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MINIMUM) +#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MEDIUM) +#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MAXIMUM) + +void StackTrace_entry(const char* name, int line, int trace); +void StackTrace_exit(const char* name, int line, void* return_value, int trace); + +void StackTrace_printStack(FILE* dest); +char* StackTrace_get(unsigned long); + +#endif + +#endif + + + + +#endif /* STACKTRACE_H_ */ diff --git a/APP_Framework/Applications/app_test/test_mqttclient/mqttclient.c b/APP_Framework/Applications/app_test/test_mqttclient/mqttclient.c new file mode 100644 index 000000000..97d519237 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/mqttclient.c @@ -0,0 +1,867 @@ +#include +#include "mqttclient.h" +#include "transport.h" +#include "MQTTPacket.h" + + +#include "string.h" +#include "sockets.h" + +#include "lwip/opt.h" + +#include "lwip/sys.h" +#include "lwip/api.h" + +#include "lwip/sockets.h" + + + +#include + +/******************************* 全局变量声明 ************************************/ +/* + * 当我们在写应用程序的时候,可能需要用到一些全局变量。 + */ +//extern QueueHandle_t MQTT_Data_Queue; + + +//定义用户消息结构体 +MQTT_USER_MSG mqtt_user_msg; + +int32_t MQTT_socket = 0; + + +void deliverMessage(MQTTString *TopicName,MQTTMessage *msg,MQTT_USER_MSG *mqtt_user_msg); + +/************************************************************************ +** 函数名称: MQTTConnect +** 函数功能: 初始化客户端并登录服务器 +** 入口参数: int32_t sock:网络描述符 +** 出口参数: >=0:发送成功 <0:发送失败 +** 备 注: +************************************************************************/ +uint8_t MQTTConnect(void) +{ + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; + uint8_t buf[200]; + int buflen = sizeof(buf); + int len = 0; + data.clientID.cstring = CLIENT_ID; //随机 + data.keepAliveInterval = KEEPLIVE_TIME; //保持活跃 + data.username.cstring = USER_NAME; //用户名 + data.password.cstring = PASSWORD; //密钥 + data.MQTTVersion = MQTT_VERSION; //3表示3.1版本,4表示3.11版本 + data.cleansession = 1; + //组装消息 + len = MQTTSerialize_connect((unsigned char *)buf, buflen, &data); + //发送消息 + transport_sendPacketBuffer(buf, len); + + /* 等待连接响应 */ + if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK) + { + unsigned char sessionPresent, connack_rc; + if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0) + { + printf("无法连接,错误代码是: %d!\n", connack_rc); + return Connect_NOK; + } + else + { + printf("用户名与密钥验证成功,MQTT连接成功!\n"); + return Connect_OK; + } + } + else + printf("MQTT连接无响应!\n"); + return Connect_NOTACK; +} + + +/************************************************************************ +** 函数名称: MQTTPingReq +** 函数功能: 发送MQTT心跳包 +** 入口参数: 无 +** 出口参数: >=0:发送成功 <0:发送失败 +** 备 注: +************************************************************************/ +int32_t MQTTPingReq(int32_t sock) +{ + int32_t len; + uint8_t buf[200]; + int32_t buflen = sizeof(buf); + fd_set readfd; + struct timeval tv; + tv.tv_sec = 5; + tv.tv_usec = 0; + + FD_ZERO(&readfd); + FD_SET(sock,&readfd); + + len = MQTTSerialize_pingreq(buf, buflen); + transport_sendPacketBuffer(buf, len); + + //等待可读事件 + if(select(sock+1,&readfd,NULL,NULL,&tv) == 0) + return -1; + + //有可读事件 + if(FD_ISSET(sock,&readfd) == 0) + return -2; + + if(MQTTPacket_read(buf, buflen, transport_getdata) != PINGRESP) + return -3; + + return 0; + +} + + +/************************************************************************ +** 函数名称: MQTTSubscribe +** 函数功能: 订阅消息 +** 入口参数: int32_t sock:套接字 +** int8_t *topic:主题 +** enum QoS pos:消息质量 +** 出口参数: >=0:发送成功 <0:发送失败 +** 备 注: +************************************************************************/ +int32_t MQTTSubscribe(int32_t sock,char *topic,enum QoS pos) +{ + static uint32_t PacketID = 0; + uint16_t packetidbk = 0; + int conutbk = 0; + uint8_t buf[100]; + int32_t buflen = sizeof(buf); + MQTTString topicString = MQTTString_initializer; + int32_t len; + int req_qos,qosbk; + + fd_set readfd; + struct timeval tv; + tv.tv_sec = 2; + tv.tv_usec = 0; + + FD_ZERO(&readfd); + FD_SET(sock,&readfd); + + //复制主题 + topicString.cstring = (char *)topic; + //订阅质量 + req_qos = pos; + + //串行化订阅消息 + len = MQTTSerialize_subscribe(buf, buflen, 0, PacketID++, 1, &topicString, &req_qos); + //发送TCP数据 + if(transport_sendPacketBuffer(buf, len) < 0) + return -1; + + //等待可读事件--等待超时 + if(select(sock+1,&readfd,NULL,NULL,&tv) == 0) + return -2; + //有可读事件--没有可读事件 + if(FD_ISSET(sock,&readfd) == 0) + return -3; + + //等待订阅返回--未收到订阅返回 + if(MQTTPacket_read(buf, buflen, transport_getdata) != SUBACK) + return -4; + + //拆订阅回应包 + if(MQTTDeserialize_suback(&packetidbk,1, &conutbk, &qosbk, buf, buflen) != 1) + return -5; + + //检测返回数据的正确性 + if((qosbk == 0x80)||(packetidbk != (PacketID-1))) + return -6; + + //订阅成功 + return 0; +} + + +/************************************************************************ +** 函数名称: UserMsgCtl +** 函数功能: 用户消息处理函数 +** 入口参数: MQTT_USER_MSG *msg:消息结构体指针 +** 出口参数: 无 +** 备 注: +************************************************************************/ +void UserMsgCtl(MQTT_USER_MSG *msg) +{ + //这里处理数据只是打印,用户可以在这里添加自己的处理方式 + printf("*****收到订阅的消息!******\n"); + //返回后处理消息 + switch(msg->msgqos) + { + case 0: + printf("MQTT>>消息质量:QoS0\n"); + break; + case 1: + printf("MQTT>>消息质量:QoS1\n"); + break; + case 2: + printf("MQTT>>消息质量:QoS2\n"); + break; + default: + printf("MQTT>>错误的消息质量\n"); + break; + } + printf("MQTT>>消息主题:%s\n",msg->topic); + //printf("MQTT>>消息类容:%s\n",msg->msg); + printf("MQTT>>消息长度:%d\n",msg->msglenth); + //Process_msg(msg->msg, msg->msglenth); + //处理完后销毁数据 + msg->valid = 0; +} + +/************************************************************************ +** 函数名称: GetNextPackID +** 函数功能: 产生下一个数据包ID +** 入口参数: 无 +** 出口参数: uint16_t packetid:产生的ID +** 备 注: +************************************************************************/ +uint16_t GetNextPackID(void) +{ + static uint16_t pubpacketid = 0; + return pubpacketid++; +} + +/************************************************************************ +** 函数名称: mqtt_msg_publish +** 函数功能: 用户推送消息 +** 入口参数: MQTT_USER_MSG *msg:消息结构体指针 +** 出口参数: >=0:发送成功 <0:发送失败 +** 备 注: +************************************************************************/ +int32_t MQTTMsgPublish(int32_t sock, char *topic, int8_t qos, uint8_t* msg) +{ + int8_t retained = 0; //保留标志位 + uint32_t msg_len; //数据长度 + //uint8_t buf[MSG_MAX_LEN]; + //int32_t buflen = sizeof(buf),len; + int32_t len; + uint8_t *buf=(uint8_t*)x_malloc(MSG_MAX_LEN*sizeof(uint8_t)); + if(buf==NULL){ + printf("申请内存失败:Publish"); + return -1; + } + int32_t buflen = MSG_MAX_LEN*sizeof(uint8_t); + MQTTString topicString = MQTTString_initializer; + uint16_t packid = 0,packetidbk; + + //填充主题 + topicString.cstring = (char *)topic; + + //填充数据包ID + if((qos == QOS1)||(qos == QOS2)) + { + packid = GetNextPackID(); + } + else + { + qos = QOS0; + retained = 0; + packid = 0; + } + + msg_len = strlen((char *)msg); + + //推送消息 + len = MQTTSerialize_publish(buf, buflen, 0, qos, retained, packid, topicString, (unsigned char*)msg, msg_len); + if(len <= 0) + return -1; + if(transport_sendPacketBuffer(buf, len) < 0) + return -2; + + //质量等级0,不需要返回 + if(qos == QOS0) + { + free(buf); + return 0; + } + + //等级1 + if(qos == QOS1) + { + free(buf); + //等待PUBACK + if(WaitForPacket(sock,PUBACK,5) < 0) + return -3; + return 1; + + } + //等级2 + if(qos == QOS2) + { + //等待PUBREC + if(WaitForPacket(sock,PUBREC,5) < 0){ + free(buf); + return -3; + } + //发送PUBREL + len = MQTTSerialize_pubrel(buf, buflen,0, packetidbk); + if(len == 0){ + free(buf); + return -4; + } + + if(transport_sendPacketBuffer(buf, len) < 0){ + free(buf); + return -6; + } + //等待PUBCOMP + if(WaitForPacket(sock,PUBREC,5) < 0){ + free(buf); + return -7; + } + free(buf); + return 2; + } + //等级错误 + free(buf); + return -8; +} + +/************************************************************************ +** 函数名称: ReadPacketTimeout +** 函数功能: 阻塞读取MQTT数据 +** 入口参数: int32_t sock:网络描述符 +** uint8_t *buf:数据缓存区 +** int32_t buflen:缓冲区大小 +** mqtt_user_msg uint32_t timeout:超时时间--0-表示直接查询,没有数据立即返回 +** 出口参数: -1:错误,其他--包类型 +** 备 注: +************************************************************************/ +int32_t ReadPacketTimeout(int32_t sock,uint8_t *buf,int32_t buflen,uint32_t timeout) +{ + fd_set readfd; + struct timeval tv; + if(timeout != 0) + { + tv.tv_sec = timeout; + tv.tv_usec = 0; + FD_ZERO(&readfd); + FD_SET(sock,&readfd); + + //等待可读事件--等待超时 + if(select(sock+1,&readfd,NULL,NULL,&tv) == 0) + return -1; + //有可读事件--没有可读事件 + if(FD_ISSET(sock,&readfd) == 0) + return -1; + } + //读取TCP/IP事件 + return MQTTPacket_read(buf, buflen, transport_getdata); +} + + +/************************************************************************ +** 函数名称: deliverMessage +** 函数功能: 接受服务器发来的消息 +** 入口参数: MQTTMessage *msg:MQTT消息结构体 +** MQTT_USER_MSG *mqtt_user_msg:用户接受结构体 +** MQTTString *TopicName:主题 +** 出口参数: 无 +** 备 注: +************************************************************************/ +void deliverMessage(MQTTString *TopicName,MQTTMessage *msg,MQTT_USER_MSG *mqtt_user_msg) +{ + //消息质量 + mqtt_user_msg->msgqos = msg->qos; + //保存消息 + memcpy(mqtt_user_msg->msg,msg->payload,msg->payloadlen); + mqtt_user_msg->msg[msg->payloadlen] = 0; + //保存消息长度 + mqtt_user_msg->msglenth = msg->payloadlen; + //消息主题 + memcpy((char *)mqtt_user_msg->topic,TopicName->lenstring.data,TopicName->lenstring.len); + mqtt_user_msg->topic[TopicName->lenstring.len] = 0; + //消息ID + mqtt_user_msg->packetid = msg->id; + //标明消息合法 + mqtt_user_msg->valid = 1; +} + + +/************************************************************************ +** 函数名称: mqtt_pktype_ctl +** 函数功能: 根据包类型进行处理 +** 入口参数: uint8_t packtype:包类型 +** 出口参数: 无 +** 备 注: +************************************************************************/ +void mqtt_pktype_ctl(uint8_t packtype,uint8_t *buf,uint32_t buflen) +{ + MQTTMessage msg; + int32_t rc; + MQTTString receivedTopic; + uint32_t len; + switch(packtype) + { + case PUBLISH: + //拆析PUBLISH消息 + if(MQTTDeserialize_publish(&msg.dup,(int*)&msg.qos, &msg.retained, &msg.id, &receivedTopic, + (unsigned char **)&msg.payload, (int*)&msg.payloadlen, buf, buflen) != 1) + return; + //接受消息 + + deliverMessage(&receivedTopic,&msg,&mqtt_user_msg); + + //消息质量不同,处理不同 + if(msg.qos == QOS0) + { + //QOS0-不需要ACK + //直接处理数据 + UserMsgCtl(&mqtt_user_msg); + return; + } + //发送PUBACK消息 + if(msg.qos == QOS1) + { + len =MQTTSerialize_puback(buf,buflen,mqtt_user_msg.packetid); + if(len == 0) + return; + //发送返回 + if(transport_sendPacketBuffer(buf,len)<0) + return; + //返回后处理消息 + UserMsgCtl(&mqtt_user_msg); + return; + } + + //对于质量2,只需要发送PUBREC就可以了 + if(msg.qos == QOS2) + { + len = MQTTSerialize_ack(buf, buflen, PUBREC, 0, mqtt_user_msg.packetid); + if(len == 0) + return; + //发送返回 + transport_sendPacketBuffer(buf,len); + } + break; + case PUBREL: + //解析包数据,必须包ID相同才可以 + rc = MQTTDeserialize_ack(&msg.type,&msg.dup, &msg.id, buf,buflen); + if((rc != 1)||(msg.type != PUBREL)||(msg.id != mqtt_user_msg.packetid)) + return ; + //收到PUBREL,需要处理并抛弃数据 + if(mqtt_user_msg.valid == 1) + { + lw_print("返回后确定消息"); + //返回后处理消息 + UserMsgCtl(&mqtt_user_msg); + } + //串行化PUBCMP消息 + len = MQTTSerialize_pubcomp(buf,buflen,msg.id); + if(len == 0) + return; + //发送返回--PUBCOMP + transport_sendPacketBuffer(buf,len); + break; + case PUBACK://等级1客户端推送数据后,服务器返回 + break; + case PUBREC://等级2客户端推送数据后,服务器返回 + break; + case PUBCOMP://等级2客户端推送PUBREL后,服务器返回 + break; + default: + break; + } + +} + +/************************************************************************ +** 函数名称: WaitForPacket +** 函数功能: 等待特定的数据包 +** 入口参数: int32_t sock:网络描述符 +** uint8_t packettype:包类型 +** uint8_t times:等待次数 +** 出口参数: >=0:等到了特定的包 <0:没有等到特定的包 +** 备 注: +************************************************************************/ +int32_t WaitForPacket(int32_t sock,uint8_t packettype,uint8_t times) +{ + int32_t type; + //uint8_t buf[MSG_MAX_LEN]; + uint8_t n = 0; + //int32_t buflen = sizeof(buf); + uint8_t *buf=(uint8_t*)x_malloc(MSG_MAX_LEN*sizeof(uint8_t)); + if(buf==NULL){ + printf("申请内存失败:WaitForPacket"); + return -1; + } + int32_t buflen = MSG_MAX_LEN*sizeof(uint8_t); + do + { + //读取数据包 + type = ReadPacketTimeout(sock,buf,buflen,2); + if(type != -1) + { + mqtt_pktype_ctl(type,buf,buflen); + free(mqtt_user_msg.msg); + } + n++; + }while((type != packettype)&&(n < times)); + //收到期望的包 + if(type == packettype) + return 0; + else + return -1; + + free(buf); +} + + + +void ClientConnect(void) +{ + char* host_ip; + +#ifdef LWIP_DNS + ip4_addr_t dns_ip; + netconn_gethostbyname(HOST_NAME, &dns_ip); + host_ip = ip_ntoa(&dns_ip); + printf("host name : %s , host_ip : %s\n",HOST_NAME,host_ip); +#else + host_ip = HOST_NAME; +#endif +MQTT_START: + + //创建网络连接 + lw_print("1.开始连接对应云平台的服务器...\n"); + lw_print("服务器IP地址:%s,端口号:%0d!\n",host_ip,HOST_PORT); + while(1) + { + //连接服务器 + MQTT_socket = transport_open((int8_t*)host_ip,HOST_PORT); + //如果连接服务器成功 + if(MQTT_socket >= 0) + { + printf("连接云平台服务器成功!\n"); + break; + } + printf("连接云平台服务器失败,等待3秒再尝试重新连接!\n"); + //等待3秒 + UserTaskDelay(20000); + //sleep(3); + } + + lw_print("2.MQTT用户名与密钥验证登录...\n"); + //MQTT用户名与密钥验证登录 + if(MQTTConnect() != Connect_OK) + { + //重连服务器 + lw_print("MQTT用户名与密钥验证登录失败...\n"); + //关闭链接 + transport_close(); + goto MQTT_START; + } + + //订阅消息 + printf("3.开始订阅消息...\n"); + //订阅消息 + if(MQTTSubscribe(MQTT_socket,(char *)TOPIC,QOS1) < 0) + { + //重连服务器 + lw_print("客户端订阅消息失败...\n"); + //关闭链接 + transport_close(); + goto MQTT_START; + } + + //无限循环 + lw_print("4.开始循环接收订阅的消息...\n"); + +} + + + + + + + + + +int32_t TestBlockMem(uint8_t* buf, int32_t count) +{ + int idx; + for(int idx=0;idx0) + { + rc=recv(MQTT_socket,block[rcTime],64*1024,0); + if(rc<0){ + return -1; + } + rcTime++; + } + int total_size=0; + //合并数据 + for(int i=0;i0) + { + ret_code=recv(MQTT_socket,buf,buflen,0); + if(ret_time%10==0){ + printf("第几次接收:%d \n",ret_time); + printf("接收大小:%d",ret_code); + } + //ret_code=recv(MQTT_socket,buf,buflen,0); + if(ret_code<0){ + printf("接收剩余消息出错"); + goto CLOSE; + } + ret_time--; + buf[buflen]='\0'; + ret_file=PrivWrite(fd_t,buf,ret_code); + + //printf("剩余消息:%s \n",buf); + } + + PrivClose(fd_t); + printf("接收消息完成"); + goto CLOSE; + } + } + +CLOSE: + free(buf); + //关闭链接 + transport_close(); + //重新链接服务器 + goto MQTT_START; +} + +void *MQTTSend(void*arg){ + int32_t ret_code; + uint8_t no_mqtt_msg_exchange=1; + uint32_t curtick; + uint8_t res; + int32_t type; + struct timeval tv; + + char* test="This is the publish"; + ClientConnect(); + while(1){ + ret_code=MQTTMsgPublish(MQTT_socket,(char*)TOPIC,QOS1,(uint8_t*)test); + if(ret_code>=0){ + no_mqtt_msg_exchange=0; + PrivTaskDelay(500); + } + } +} + +void mqtt_send_thread(){ + int32_t ret_code; + uint8_t no_mqtt_msg_exchange = 1; + uint32_t curtick; + uint8_t res; +} + +void TcpSocketConfigParam(char *ip_str) +{ + int ip1, ip2, ip3, ip4, port = 0; + + if(ip_str == NULL) + return; + + if(sscanf(ip_str, "%d.%d.%d.%d:%d", &ip1, &ip2, &ip3, &ip4, &port)) { + printf("config ip %s port %d\n", ip_str, port); + strcpy(tcp_ip_str, ip_str); + if(port) + tcp_socket_port = port; + return; + } + + if(sscanf(ip_str, "%d.%d.%d.%d", &ip1, &ip2, &ip3, &ip4)) { + printf("config ip %s\n", ip_str); + strcpy(tcp_ip_str, ip_str); + } +} + +void LwipConfig(){ + lwip_config_tcp(0,tcp_demo_ipaddr,tcp_demo_netmask,tcp_demo_gwaddr); +} + +void testTimerFunc(void*arg){ + if(MQTTPingReq(MQTT_socket) < 0) + { + //重连服务器 + printf("发送保持活性ping失败....\n"); + } + //心跳成功 + printf("发送保持活性ping作为心跳成功....\n"); +} + +void TestTimer(){ + char* name="first timer \n"; + int32 ret=KCreateTimer(name,testTimerFunc,0,100,TIMER_TRIGGER_PERIODIC); + KTimerStartRun(ret); +} + + +void TestFile(){ + int ret=0; + char w_buf[]="hello flilesystem"; + char r_buf[20]; + int fd_t=PrivOpen("/hello.txt",O_RDWR|O_CREAT); + if(fd_t>0){ + ret=PrivWrite(fd_t,w_buf,strlen(w_buf)); + if(ret<0){ + printf("fd = %d write hello world failed.\n",fd_t); + return; + } + PrivClose(fd_t); + fd_t = PrivOpen("/hello.txt", O_RDONLY); + ret = PrivRead(fd_t, r_buf, 20); + if(ret > 0){ + printf("read len %d, r_buf %s\n",ret,r_buf); + } + PrivClose(fd_t); + } +} \ No newline at end of file diff --git a/APP_Framework/Applications/app_test/test_mqttclient/mqttclient.h b/APP_Framework/Applications/app_test/test_mqttclient/mqttclient.h new file mode 100644 index 000000000..cd04f1b83 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/mqttclient.h @@ -0,0 +1,218 @@ +#ifndef __MALLOC_H +#define __MALLOC_H + +#include +#include "lwipopts.h" +#include +#include +#include +#define MSG_MAX_LEN 1024 +#define MSG_TOPIC_LEN 50 +#define KEEPLIVE_TIME 50 +#define MQTT_VERSION 4 +#define TIMER_TRIGGER_ONCE (1 << 0) +#define TIMER_TRIGGER_PERIODIC (1 << 1) +#undef LWIP_DNS + +#ifdef LWIP_DNS +#define HOST_NAME "mqtt.heclouds.com" //服务器域名 +#else +#define HOST_NAME "192.168.56.137" //服务器IP地址 +#endif +//#define HOST_NAME "192.168.56.137" + + +//#define HOST_IP "129.204.201.235" +#define HOST_PORT 1883 //由于是TCP连接,端口必须是1883 + +#define CLIENT_ID "518725049" //随机的id +#define USER_NAME "hzw" //用户名 +#define PASSWORD "124578" //秘钥 + +#define TOPIC "temp" //订阅的主题 + +#define TEST_MESSAGE "test_message" //发送测试消息 + +#define BLOCK_NUM 10 //内存池数量 + +#define LWIP_IP_ADDR {'192','168','56','3'} +#define LWIP_NETMASK {'255','255','255','0'} +#define LWIP_GATEWAY {'192','168','56','1'} + +static char tcp_demo_ipaddr[] = {192, 168, 56, 3}; +static char tcp_demo_netmask[] = {255, 255, 255, 0}; +static char tcp_demo_gwaddr[] = {192, 168, 56, 1}; + +static uint16_t tcp_socket_port = 8888; +static char tcp_ip_str[128] = {0}; + +static uint8 *block[BLOCK_NUM]; // 内存池指针 +static GatherMemType memG; + +enum QoS +{ QOS0 = 0, + QOS1, + QOS2 +}; + +enum MQTT_Connect +{ + Connect_OK = 0, + Connect_NOK, + Connect_NOTACK +}; + +//数据交互结构体 +typedef struct __MQTTMessage +{ + uint32_t qos; + uint8_t retained; + uint8_t dup; + uint16_t id; + uint8_t type; + void *payload; + int32_t payloadlen; +}MQTTMessage; + +//用户接收消息结构体 +typedef struct __MQTT_MSG +{ + uint8_t msgqos; //消息质量 + uint8_t *msg; //消息 + uint32_t msglenth; //消息长度 + uint8_t topic[MSG_TOPIC_LEN]; //主题 + uint16_t packetid; //消息ID + uint8_t valid; //标明消息是否有效 +}MQTT_USER_MSG; + +//发送消息结构体 +typedef struct +{ + int8_t topic[MSG_TOPIC_LEN]; + int8_t qos; + int8_t retained; + + uint8_t *msg; + uint8_t msglen; +} mqtt_recv_msg_t, *p_mqtt_recv_msg_t, mqtt_send_msg_t, *p_mqtt_send_msg_t; + + +void mqtt_thread( void *pvParameters); + +/************************************************************************ +** 函数名称: my_mqtt_send_pingreq +** 函数功能: 发送MQTT心跳包 +** 入口参数: 无 +** 出口参数: >=0:发送成功 <0:发送失败 +** 备 注: +************************************************************************/ +int32_t MQTTPingReq(int32_t sock); + +/************************************************************************ +** 函数名称: MQTTConnect +** 函数功能: 登录服务器 +** 入口参数: int32_t sock:网络描述符 +** 出口参数: Connect_OK:登陆成功 其他:登陆失败 +** 备 注: +************************************************************************/ +uint8_t MQTTConnect(void); + +/************************************************************************ +** 函数名称: MQTTSubscribe +** 函数功能: 订阅消息 +** 入口参数: int32_t sock:套接字 +** int8_t *topic:主题 +** enum QoS pos:消息质量 +** 出口参数: >=0:发送成功 <0:发送失败 +** 备 注: +************************************************************************/ +int32_t MQTTSubscribe(int32_t sock,char *topic,enum QoS pos); + +/************************************************************************ +** 函数名称: UserMsgCtl +** 函数功能: 用户消息处理函数 +** 入口参数: MQTT_USER_MSG *msg:消息结构体指针 +** 出口参数: 无 +** 备 注: +************************************************************************/ +void UserMsgCtl(MQTT_USER_MSG *msg); + +/************************************************************************ +** 函数名称: GetNextPackID +** 函数功能: 产生下一个数据包ID +** 入口参数: 无 +** 出口参数: uint16_t packetid:产生的ID +** 备 注: +************************************************************************/ +uint16_t GetNextPackID(void); + +/************************************************************************ +** 函数名称: mqtt_msg_publish +** 函数功能: 用户推送消息 +** 入口参数: MQTT_USER_MSG *msg:消息结构体指针 +** 出口参数: >=0:发送成功 <0:发送失败 +** 备 注: +************************************************************************/ +//int32_t MQTTMsgPublish(int32_t sock, char *topic, int8_t qos, int8_t retained,uint8_t* msg,uint32_t msg_len); +int32_t MQTTMsgPublish(int32_t sock, char *topic, int8_t qos, uint8_t* msg); +/************************************************************************ +** 函数名称: ReadPacketTimeout +** 函数功能: 阻塞读取MQTT数据 +** 入口参数: int32_t sock:网络描述符 +** uint8_t *buf:数据缓存区 +** int32_t buflen:缓冲区大小 +** uint32_t timeout:超时时间--0-表示直接查询,没有数据立即返回 +** 出口参数: -1:错误,其他--包类型 +** 备 注: +************************************************************************/ +int32_t ReadPacketTimeout(int32_t sock,uint8_t *buf,int32_t buflen,uint32_t timeout); + +/************************************************************************ +** 函数名称: mqtt_pktype_ctl +** 函数功能: 根据包类型进行处理 +** 入口参数: uint8_t packtype:包类型 +** 出口参数: 无 +** 备 注: +************************************************************************/ +void mqtt_pktype_ctl(uint8_t packtype,uint8_t *buf,uint32_t buflen); + +/************************************************************************ +** 函数名称: WaitForPacket +** 函数功能: 等待特定的数据包 +** 入口参数: int32_t sock:网络描述符 +** uint8_t packettype:包类型 +** uint8_t times:等待次数 +** 出口参数: >=0:等到了特定的包 <0:没有等到特定的包 +** 备 注: +************************************************************************/ +int32_t WaitForPacket(int32_t sock,uint8_t packettype,uint8_t times); + + +/*测试动态内存池的分块*/ +int32_t TestBlockMem(uint8_t* buf, int32_t count); + +/*接收MQTT消息,是测试的赛题*/ +void *MQTTRecv(void*arg); + +/*发送MQTT报文*/ +void *MQTTSend(void*arg); + +/*封装lwip_config_tcp,使得开发板和主机上的运行在虚拟机上的ip网段一致*/ +void LwipConfig(); + +/*从socketdemo那拿来作为测试*/ +void TcpSocketConfigParam(char *ip_str); + +/*测试写入文件功能*/ +void TestFile(); + +/*测试定时器功能*/ +void TestTimer(); + +/*接收MQTT报文的第一个包。抛弃了MQTT库要求的检查buflen必须大于负载长度,以及必须读取完整的报文。*/ +int32_t MQTTRead(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int)); + +#endif + + + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/readme.md b/APP_Framework/Applications/app_test/test_mqttclient/readme.md new file mode 100644 index 000000000..62a6e12c1 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/readme.md @@ -0,0 +1,67 @@ +## 基于矽璓已实现的Lwip,在ARM上实现MQTT Client订阅功能 + +队伍:oscar2 + +## 1. 简介 +使用paho的标准MQTT库文件,以及使用网上提供的使用了lwip的mqtt文件。mqttclient.c文件为参考网上的代码库。随后修改为连接参数。 + +IP:192.168.56.137 +port:1883 +订阅主题:temp + +lwip设置 +ip:192.168.56.3 +netmask:255.255.255.0 +gateway:192.168.56.1 + + +## 2. 订阅函数说明 + +//数据交互结构体 +typedef struct __MQTTMessage +{ + uint32_t qos; //消息质量 + uint8_t retained; //遗嘱标志 + uint8_t dup; //重复分发标志 + uint16_t id; //数据包id + uint8_t type; //数据包类型 + void *payload; //消息负载 + int32_t payloadlen; //负载长度 +}MQTTMessage; + + +//用户接收消息结构体 +typedef struct __MQTT_MSG +{ + uint8_t msgqos; //消息质量 + uint8_t *msg; //消息 + uint32_t msglenth; //消息长度 + uint8_t topic[MSG_TOPIC_LEN]; //主题 + uint16_t packetid; //消息ID + uint8_t valid; //标明消息是否有效 +}MQTT_USER_MSG; + + +void MQTT_Recv(void *parameter) +用于接收消息的函数。在测试接收消息时,接收数个字符的数据可以的。题目要求的是发送640kb的报文,分为十次接收,每次接收64kb。向paho的接收报文的参数传入transport函数,接收失败。遇到了一些问题。 +第一个问题:mqtt的报文头包含负载长度。本题发送640kb,但每次接收时只接受64kb,那么会导致负载长度和实际接收长度不一致,接收失败。所以需要放弃标准mqtt库提供的函数。重新写一个MQTT_Read函数。分十次的接受一个完整的MQTT报文的方法为,第一个包在接收时,不按照mqtt库要求的检查负载长度和实际接收长度是否一致,将接收的包含报文头的第一个数据包的按照原先的步骤,拆解。剩余的数据包,就可以直接接收。 +第二个问题:640KB超过arm开发板的内存。ShowMemory命令显示413664字节。不到640kb,因此将数据写入文件。 +最后的问题:revc函数的缓冲区似乎没有64kb。使用setsocketopt函数设置缓冲区大小失败。所以一次接收64kb似乎也不大行,测试了多次,结果是缓冲区大小不足。当然可能是因为上我编程的失误。所以直接设置为1kb。分为64次接收。 + +以上问题也可能是我个人的编程错误导致的。 + +## 3. 测试程序说明 +测试了函数的订阅以及接收信息的功能。使用——————命令,该命令首先设置ip和端口,保证主机的ip和开发板的ip的网段一致。连接主机上的emqx服务器,订阅主题"temp"。最后MQTTX向主题发布640KB的消息,开发板接收消息并且写入文件。最后查看写入文件的大小。 + +## 4. 运行结果(##需结合运行测试截图按步骤说明##) + + +![image](Compile.png) +添加Lwip,SD等选项,编译edu-arm32成功。 + +![image](Command.png) +使用MQTTSocketRecv命令,设置IP信息,开启连接,并且订阅主题temp,等待消息发送。 + +![image](Result.png) +MQTTX发送消息后,接收消息,并将输入存入storeMsg.txt文件,接收完成后查看文件大小。 + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/test_mqttclient.c b/APP_Framework/Applications/app_test/test_mqttclient/test_mqttclient.c new file mode 100644 index 000000000..cc7be34ee --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/test_mqttclient.c @@ -0,0 +1,68 @@ +#include +#include "test_mqttclient.h" + +/* +* Copyright (c) 2023 AIIT XUOS Lab +* XiUOS is licensed under Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +* See the Mulan PSL v2 for more details. +*/ + + +void MQTTSendTest(int argc, char *argv[]) +{ + + if(argc >= 2) { + lw_print("lw: [%s] target ip %s\n", __func__, argv[1]); + TcpSocketConfigParam(argv[1]); + } + LwipConfig(); +#ifdef ADD_XIZI_FEATURES + pthread_attr_t attr; + attr.schedparam.sched_priority = LWIP_TCP_DEMO_TASK_PRIO; + attr.stacksize = LWIP_TCP_DEMO_TASK_STACK_SIZE; +#endif +#ifdef ADD_NUTTX_FEATURES + pthread_attr_t attr = PTHREAD_ATTR_INITIALIZER; + attr.priority = LWIP_TCP_DEMO_TASK_PRIO; + attr.stacksize = LWIP_TCP_DEMO_TASK_STACK_SIZE; +#endif + + PrivTaskCreate(&tcp_client_task, &attr, &MQTTSend, NULL); + PrivTaskStartup(&tcp_client_task); +} + + +void MQTTRecvTest(int argc, char *argv[]) +{ + if(argc >= 2) { + lw_print("lw: [%s] target ip %s\n", __func__, argv[1]); + TcpSocketConfigParam(argv[1]); + } + +#ifdef ADD_XIZI_FEATURES + lwip_config_tcp(0, tcp_demo_ipaddr, tcp_demo_netmask, tcp_demo_gwaddr); + + pthread_attr_t attr; + attr.schedparam.sched_priority = LWIP_TCP_DEMO_TASK_PRIO; + attr.stacksize = LWIP_TCP_DEMO_TASK_STACK_SIZE; +#endif + +#ifdef ADD_NUTTX_FEATURES + pthread_attr_t attr = PTHREAD_ATTR_INITIALIZER; + attr.priority = LWIP_TCP_DEMO_TASK_PRIO; + attr.stacksize = LWIP_TCP_DEMO_TASK_STACK_SIZE; +#endif + PrivTaskCreate(&tcp_server_task, &attr, &MQTTRecv, NULL); + PrivTaskStartup(&tcp_server_task); +} + +PRIV_SHELL_CMD_FUNCTION(TestTimer,a Timer test sample, PRIV_SHELL_CMD_MAIN_ATTR); +PRIV_SHELL_CMD_FUNCTION(TestFile,a filesystem test sample using sd card, PRIV_SHELL_CMD_MAIN_ATTR); +PRIV_SHELL_CMD_FUNCTION(MQTTRecvTest, a tcp receive sample, PRIV_SHELL_CMD_MAIN_ATTR); +PRIV_SHELL_CMD_FUNCTION(MQTTSendTest, Implement mqtt_client, PRIV_SHELL_CMD_MAIN_ATTR); \ No newline at end of file diff --git a/APP_Framework/Applications/app_test/test_mqttclient/test_mqttclient.h b/APP_Framework/Applications/app_test/test_mqttclient/test_mqttclient.h new file mode 100644 index 000000000..d8c89eee3 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/test_mqttclient.h @@ -0,0 +1,39 @@ +/* +* Copyright (c) 2023 AIIT XUOS Lab +* XiUOS is licensed under Mulan PSL v2. +* You can use this software according to the terms and conditions of the Mulan PSL v2. +* You may obtain a copy of Mulan PSL v2 at: +* http://license.coscl.org.cn/MulanPSL2 +* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +* See the Mulan PSL v2 for more details. +*/ + +#include "mqttclient.h" +#define LWIP_TCP_DEMO_TASK_PRIO 20 +#define LWIP_TCP_DEMO_TASK_STACK_SIZE 4096 + +#ifdef ADD_XIZI_FEATURES +#include +#include +#include "lwip/sys.h" +#endif + + +#ifdef ADD_NUTTX_FEATURES +#include +#include +#include +#include "stdio.h" +#endif + +static pthread_t tcp_client_task; +static pthread_t tcp_server_task; + +/*测试发送消息*/ +void MQTTSendTest(int argc, char *argv[]); + +/*测试接收消息*/ +void MQTTRecvTest(int argc, char *argv[]); + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/transport.c b/APP_Framework/Applications/app_test/test_mqttclient/transport.c new file mode 100644 index 000000000..654d7f103 --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/transport.c @@ -0,0 +1,103 @@ +#include "transport.h" +#include "lwip/opt.h" +#include "lwip/arch.h" +#include "lwip/api.h" +#include "lwip/inet.h" +#include "lwip/sockets.h" +#include "string.h" + +static int mysock; + +/************************************************************************ +** 函数名称: transport_sendPacketBuffer +** 函数功能: 以TCP方式发送数据 +** 入口参数: unsigned char* buf:数据缓冲区 +** int buflen:数据长度 +** 出口参数: <0发送数据失败 +************************************************************************/ +int32_t transport_sendPacketBuffer( uint8_t* buf, int32_t buflen) +{ + int32_t rc; + rc = write(mysock, buf, buflen); + return rc; + +} + +/************************************************************************ +** 函数名称: transport_getdata +** 函数功能: 以阻塞的方式接收TCP数据 +** 入口参数: unsigned char* buf:数据缓冲区 +** int count:数据长度 +** 出口参数: <=0接收数据失败 +************************************************************************/ +int transport_getdata(unsigned char* buf, int count) +{ + int rc; + //这个函数在这里不阻塞 + rc = recv(mysock, buf, count, 0); + return rc; +} + + + +/************************************************************************ +** 函数名称: transport_open +** 函数功能: 打开一个接口,并且和服务器 建立连接 +** 入口参数: char* servip:服务器域名 +** int port:端口号 +** 出口参数: <0打开连接失败 +************************************************************************/ +int32_t transport_open(int8_t* servip, int32_t port) +{ + int32_t *sock = (int32_t*)&mysock; + int32_t ret; +// int32_t opt; + struct sockaddr_in addr; + + //初始换服务器信息 + memset(&addr,0,sizeof(addr)); + addr.sin_len = sizeof(addr); + addr.sin_family = AF_INET; + //填写服务器端口号 + addr.sin_port = PP_HTONS(port); + //填写服务器IP地址 + addr.sin_addr.s_addr = inet_addr((const char*)servip); + + //创建SOCK + *sock = socket(AF_INET,SOCK_STREAM,0); + //连接服务器 + ret = connect(*sock,(struct sockaddr*)&addr,sizeof(addr)); + if(ret != 0) + { + //关闭链接 + close(*sock); + //连接失败 + return -1; + } + //连接成功,设置缓冲区大小为70kb,测试结果为设置无效 + int nSendBuf=70*1024; + setsockopt(*sock,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int)); + + //返回套接字 + return *sock; +} + + +/************************************************************************ +** 函数名称: transport_close +** 函数功能: 关闭套接字 +** 入口参数: unsigned char* buf:数据缓冲区 +** int buflen:数据长度 +** 出口参数: <0发送数据失败 +************************************************************************/ +int32_t transport_close(void) +{ + + int rc; +// rc = close(mysock); + rc = shutdown(mysock, SHUT_WR); + rc = recv(mysock, NULL, (size_t)0, 0); + rc = close(mysock); + return rc; +} + diff --git a/APP_Framework/Applications/app_test/test_mqttclient/transport.h b/APP_Framework/Applications/app_test/test_mqttclient/transport.h new file mode 100644 index 000000000..6fb046e7b --- /dev/null +++ b/APP_Framework/Applications/app_test/test_mqttclient/transport.h @@ -0,0 +1,43 @@ +#ifndef __TRANSPORT_H +#define __TRANSPORT_H + +#include + +/************************************************************************ +** 函数名称: transport_sendPacketBuffer +** 函数功能: 以TCP方式发送数据 +** 入口参数: unsigned char* buf:数据缓冲区 +** int buflen:数据长度 +** 出口参数: <0发送数据失败 +************************************************************************/ +int32_t transport_sendPacketBuffer( uint8_t* buf, int32_t buflen); + +/************************************************************************ +** 函数名称: transport_getdata +** 函数功能: 以阻塞的方式接收TCP数据 +** 入口参数: unsigned char* buf:数据缓冲区 +** int count:数据长度 +** 出口参数: <=0接收数据失败 +************************************************************************/ +int transport_getdata(unsigned char* buf, int count); + +/************************************************************************ +** 函数名称: transport_open +** 函数功能: 打开一个接口,并且和服务器 建立连接 +** 入口参数: char* servip:服务器域名 +** int port:端口号 +** 出口参数: <0打开连接失败 +************************************************************************/ +int32_t transport_open(int8_t* servip, int32_t port); + +/************************************************************************ +** 函数名称: transport_close +** 函数功能: 关闭套接字 +** 入口参数: unsigned char* buf:数据缓冲区 +** int buflen:数据长度 +** 出口参数: <0发送数据失败 +************************************************************************/ +int32_t transport_close(void); + + +#endif diff --git a/Ubiquitous/XiZi_IIoT/lib/newlib/time_syscalls.c b/Ubiquitous/XiZi_IIoT/lib/newlib/time_syscalls.c index 10f1efcb6..568be99c7 100644 --- a/Ubiquitous/XiZi_IIoT/lib/newlib/time_syscalls.c +++ b/Ubiquitous/XiZi_IIoT/lib/newlib/time_syscalls.c @@ -16,7 +16,10 @@ time_t time(time_t *t) { - NULL_PARAM_CHECK(t); + if(t==NULL){ + return 0; + } + //NULL_PARAM_CHECK(t); time_t current = 0; #ifdef RESOURCES_RTC