add MQTT-C
This commit is contained in:
parent
2956b5861f
commit
b8a96b40b6
|
@ -0,0 +1,81 @@
|
|||
cmake_minimum_required(VERSION 3.5)
|
||||
project(MQTT-C VERSION 1.1.2 LANGUAGES C)
|
||||
|
||||
# MQTT-C build options
|
||||
option(MQTT_C_OpenSSL_SUPPORT "Build MQTT-C with OpenSSL support?" OFF)
|
||||
option(MQTT_C_MbedTLS_SUPPORT "Build MQTT-C with mbed TLS support?" OFF)
|
||||
option(MQTT_C_EXAMPLES "Build MQTT-C examples?" ON)
|
||||
option(MQTT_C_TESTS "Build MQTT-C tests?" OFF)
|
||||
|
||||
list (APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake)
|
||||
|
||||
# MQTT-C library
|
||||
add_library(mqttc STATIC
|
||||
src/mqtt_pal.c
|
||||
src/mqtt.c
|
||||
)
|
||||
target_include_directories(mqttc PUBLIC include)
|
||||
target_link_libraries(mqttc PUBLIC
|
||||
$<$<C_COMPILER_ID:MSVS>:ws2_32>
|
||||
)
|
||||
|
||||
|
||||
# Configure with OpenSSL support
|
||||
if(MQTT_C_OpenSSL_SUPPORT)
|
||||
find_package(OpenSSL REQUIRED)
|
||||
target_link_libraries(mqttc INTERFACE OpenSSL::SSL)
|
||||
target_compile_definitions(mqttc PUBLIC MQTT_USE_BIO)
|
||||
endif()
|
||||
|
||||
# Configure with mbed TLS support
|
||||
if(MQTT_C_MbedTLS_SUPPORT)
|
||||
find_package(MbedTLS REQUIRED)
|
||||
target_include_directories(mqttc PUBLIC ${MBEDTLS_INCLUDE_DIRS})
|
||||
target_link_libraries(mqttc INTERFACE ${MBEDTLS_LIBRARY})
|
||||
target_compile_definitions(mqttc PUBLIC MQTT_USE_MBEDTLS)
|
||||
endif()
|
||||
|
||||
# Build examples
|
||||
if(MQTT_C_EXAMPLES)
|
||||
find_package(Threads REQUIRED)
|
||||
|
||||
if(MQTT_C_OpenSSL_SUPPORT)
|
||||
add_executable(bio_publisher examples/bio_publisher.c)
|
||||
target_link_libraries(bio_publisher Threads::Threads mqttc)
|
||||
|
||||
add_executable(openssl_publisher examples/openssl_publisher.c)
|
||||
target_link_libraries(openssl_publisher Threads::Threads mqttc)
|
||||
elseif(MQTT_C_MbedTLS_SUPPORT)
|
||||
add_executable(mbedtls_publisher examples/mbedtls_publisher.c)
|
||||
target_link_libraries(mbedtls_publisher Threads::Threads mqttc ${MBEDX509_LIBRARY} ${MBEDCRYPTO_LIBRARY})
|
||||
else()
|
||||
add_executable(simple_publisher examples/simple_publisher.c)
|
||||
target_link_libraries(simple_publisher Threads::Threads mqttc)
|
||||
|
||||
add_executable(simple_subscriber examples/simple_subscriber.c)
|
||||
target_link_libraries(simple_subscriber Threads::Threads mqttc)
|
||||
|
||||
add_executable(reconnect_subscriber examples/reconnect_subscriber.c)
|
||||
target_link_libraries(reconnect_subscriber Threads::Threads mqttc)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
# Build tests
|
||||
if(MQTT_C_TESTS)
|
||||
find_path(CMOCKA_INCLUDE_DIR cmocka.h)
|
||||
find_library(CMOCKA_LIBRARY cmocka)
|
||||
if((NOT CMOCKA_INCLUDE_DIR) OR (NOT CMOCKA_LIBRARY))
|
||||
message(FATAL_ERROR "Failed to find cmocka! Add cmocka's install prefix to CMAKE_PREFIX_PATH to resolve this error.")
|
||||
endif()
|
||||
|
||||
add_executable(tests tests.c)
|
||||
target_link_libraries(tests ${CMOCKA_LIBRARY} mqttc)
|
||||
target_include_directories(tests PRIVATE ${CMOCKA_INCLUDE_DIR})
|
||||
endif()
|
||||
|
||||
# Install includes and library
|
||||
install(TARGETS mqttc
|
||||
DESTINATION lib
|
||||
)
|
||||
install(DIRECTORY include/
|
||||
DESTINATION include)
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,21 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) 2018 Liam Bindle
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
|
@ -0,0 +1,108 @@
|
|||
<p align="right">
|
||||
<a href="https://github.com/LiamBindle/MQTT-C/stargazers"><img src="https://img.shields.io/github/stars/LiamBindle/MQTT-C.svg?style=social&label=Star" style="margin-left:5em"></a>
|
||||
<a href="https://github.com/LiamBindle/MQTT-C/network/members"><img src="https://img.shields.io/github/forks/LiamBindle/MQTT-C.svg?style=social&label=Fork"></a>
|
||||
</p>
|
||||
|
||||
<p align="center">
|
||||
<img width="70%" src="docs/mqtt-c-logo.png"><br>
|
||||
<a href="https://liambindle.ca/MQTT-C"><img src="https://img.shields.io/badge/docs-passing-brightgreen.svg"></a>
|
||||
<a href="https://github.com/LiamBindle/MQTT-C/issues"><img src="https://img.shields.io/badge/Maintained%3F-yes-green.svg"></a>
|
||||
<a href="https://GitHub.com/LiamBindle/MQTT-C/issues/"><img src="https://img.shields.io/github/issues/LiamBindle/MQTT-C.svg"></a>
|
||||
<a href="https://github.com/LiamBindle/MQTT-C/issues"><img src="https://img.shields.io/github/issues-closed/LiamBindle/MQTT-C.svg"></a>
|
||||
<a href="https://github.com/LiamBindle/MQTT-C/blob/master/LICENSE"><img src="https://img.shields.io/badge/License-MIT-blue.svg"></a>
|
||||
</p>
|
||||
|
||||
#
|
||||
|
||||
MQTT-C is an [MQTT v3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html)
|
||||
client written in C. MQTT is a lightweight publisher-subscriber-based messaging protocol that is
|
||||
commonly used in IoT and networking applications where high-latency and low data-rate links
|
||||
are expected. The purpose of MQTT-C is to provide a **portable** MQTT client, **written in C**,
|
||||
for embedded systems and PC's alike. MQTT-C does this by providing a transparent Platform
|
||||
Abstraction Layer (PAL) which makes porting to new platforms easy. MQTT-C is completely
|
||||
thread-safe but can also run perfectly fine on single-threaded systems making MQTT-C
|
||||
well-suited for embedded systems and microcontrollers. Finally, MQTT-C is small; there are only
|
||||
two source files totalling less than 2000 lines.
|
||||
|
||||
#### A note from the author
|
||||
It's been great to hear about all the places MQTT-C is being used! Please don't hesitate
|
||||
to get in touch with me or submit issues on GitHub!
|
||||
|
||||
## Getting Started
|
||||
To use MQTT-C you first instantiate a `struct mqtt_client` and initialize it by calling
|
||||
@ref mqtt_init.
|
||||
```c
|
||||
struct mqtt_client client; /* instantiate the client */
|
||||
mqtt_init(&client, ...); /* initialize the client */
|
||||
```
|
||||
Once your client is initialized you need to connect to an MQTT broker.
|
||||
```c
|
||||
mqtt_connect(&client, ...); /* send a connection request to the broker. */
|
||||
```
|
||||
At this point the client is ready to use! For example, we can subscribe to a topic like so:
|
||||
```c
|
||||
/* subscribe to "toaster/temperature" with a max QoS level of 0 */
|
||||
mqtt_subscribe(&client, "toaster/temperature", 0);
|
||||
```
|
||||
And we can publish to a topic like so:
|
||||
```c
|
||||
/* publish coffee temperature with a QoS level of 1 */
|
||||
int temperature = 67;
|
||||
mqtt_publish(&client, "coffee/temperature", &temperature, sizeof(int), MQTT_PUBLISH_QOS_1);
|
||||
```
|
||||
Those are the basics! From here the [examples](https://github.com/LiamBindle/MQTT-C/tree/master/examples) and [API documentation](https://liambindle.ca/MQTT-C/group__api.html) are good places to get started.
|
||||
|
||||
## Building
|
||||
There are **only two source files** that need to be built, `mqtt.c` and `mqtt_pal.c`.
|
||||
These files are ANSI C (C89) compatible, and should compile with any C compiler.
|
||||
|
||||
Then, simply <code>\#include <mqtt.h></code>.
|
||||
|
||||
Alternatively, you can build MQTT-C with CMake or the provided Makefile. These are provided for convenience.
|
||||
|
||||
## Documentation
|
||||
Pre-built documentation can be found here: [https://liambindle.ca/MQTT-C](https://liambindle.ca/MQTT-C). Be sure to check out the [examples](https://github.com/LiamBindle/MQTT-C/tree/master/examples) too.
|
||||
|
||||
The @ref api documentation contains all the documentation application programmers should need.
|
||||
The @ref pal documentation contains everything you should need to port MQTT-C to a new platform,
|
||||
and the other modules contain documentation for MQTT-C developers.
|
||||
|
||||
## Testing and Building the Tests
|
||||
The MQTT-C unit tests use the [cmocka unit testing framework](https://cmocka.org/).
|
||||
Therefore, [cmocka](https://cmocka.org/) *must* be installed on your machine to build and run
|
||||
the unit tests. For convenience, a simple `"makefile"` is included to build the unit tests and
|
||||
examples on UNIX-like machines. The unit tests and examples can be built as follows:
|
||||
```bash
|
||||
$ make all
|
||||
```
|
||||
The unit tests and examples will be built in the `"bin/"` directory. The unit tests can be run
|
||||
like so:
|
||||
```bash
|
||||
$ ./bin/tests [address [port]]
|
||||
```
|
||||
Note that the \c address and \c port arguments are both optional to specify the location of the
|
||||
MQTT broker that is to be used for the tests. If no \c address is given then the
|
||||
[Mosquitto MQTT Test Server](https://test.mosquitto.org/) will be used. If no \c port is given,
|
||||
port 1883 will be used.
|
||||
|
||||
## Portability
|
||||
MQTT-C provides a transparent platform abstraction layer (PAL) in `mqtt_pal.h` and `mqtt_pal.c`.
|
||||
These files declare and implement the types and calls that MQTT-C requires. Refer to
|
||||
@ref pal for the complete documentation of the PAL.
|
||||
|
||||
## Contributing
|
||||
Please feel free to submit issues and pull-requests [here](https://github.com/LiamBindle/MQTT-C).
|
||||
When submitting a pull-request please ensure you have *fully documented* your changes and
|
||||
added the appropriate unit tests.
|
||||
|
||||
|
||||
## License
|
||||
This project is licensed under the [MIT License](https://opensource.org/licenses/MIT). See the
|
||||
`"LICENSE"` file for more details.
|
||||
|
||||
## Authors
|
||||
MQTT-C was initially developed as a CMPT 434 (Winter Term, 2018) final project at the University of
|
||||
Saskatchewan by:
|
||||
- **Liam Bindle**
|
||||
- **Demilade Adeoye**
|
||||
|
|
@ -0,0 +1,156 @@
|
|||
|
||||
/**
|
||||
* @file
|
||||
* A simple program to that publishes the current time whenever ENTER is pressed.
|
||||
*/
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include <mqtt.h>
|
||||
#include "templates/bio_sockets.h"
|
||||
|
||||
|
||||
/**
|
||||
* @brief The function that would be called whenever a PUBLISH is received.
|
||||
*
|
||||
* @note This function is not used in this example.
|
||||
*/
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published);
|
||||
|
||||
/**
|
||||
* @brief The client's refresher. This function triggers back-end routines to
|
||||
* handle ingress/egress traffic to the broker.
|
||||
*
|
||||
* @note All this function needs to do is call \ref __mqtt_recv and
|
||||
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
|
||||
* client ingress/egress traffic will be handled every 100 ms.
|
||||
*/
|
||||
void* client_refresher(void* client);
|
||||
|
||||
/**
|
||||
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
|
||||
*/
|
||||
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon);
|
||||
|
||||
/**
|
||||
* A simple program to that publishes the current time whenever ENTER is pressed.
|
||||
*/
|
||||
int main(int argc, const char *argv[])
|
||||
{
|
||||
const char* addr;
|
||||
const char* port;
|
||||
const char* topic;
|
||||
|
||||
/* Load OpenSSL */
|
||||
SSL_load_error_strings();
|
||||
ERR_load_BIO_strings();
|
||||
OpenSSL_add_all_algorithms();
|
||||
|
||||
/* get address (argv[1] if present) */
|
||||
if (argc > 1) {
|
||||
addr = argv[1];
|
||||
} else {
|
||||
addr = "test.mosquitto.org";
|
||||
}
|
||||
|
||||
/* get port number (argv[2] if present) */
|
||||
if (argc > 2) {
|
||||
port = argv[2];
|
||||
} else {
|
||||
port = "1883";
|
||||
}
|
||||
|
||||
/* get the topic name to publish */
|
||||
if (argc > 3) {
|
||||
topic = argv[3];
|
||||
} else {
|
||||
topic = "datetime";
|
||||
}
|
||||
|
||||
/* open the non-blocking TCP socket (connecting to the broker) */
|
||||
BIO* sockfd = open_nb_socket(addr, port);
|
||||
|
||||
if (sockfd == NULL) {
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
}
|
||||
|
||||
/* setup a client */
|
||||
struct mqtt_client client;
|
||||
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
|
||||
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
|
||||
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
|
||||
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
|
||||
|
||||
/* check that we don't have any errors */
|
||||
if (client.error != MQTT_OK) {
|
||||
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
}
|
||||
|
||||
/* start a thread to refresh the client (handle egress and ingree client traffic) */
|
||||
pthread_t client_daemon;
|
||||
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
|
||||
fprintf(stderr, "Failed to start client daemon.\n");
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
|
||||
}
|
||||
|
||||
/* start publishing the time */
|
||||
printf("%s is ready to begin publishing the time.\n", argv[0]);
|
||||
printf("Press ENTER to publish the current time.\n");
|
||||
printf("Press CTRL-D (or any other key) to exit.\n\n");
|
||||
while(fgetc(stdin) == '\n') {
|
||||
/* get the current time */
|
||||
time_t timer;
|
||||
time(&timer);
|
||||
struct tm* tm_info = localtime(&timer);
|
||||
char timebuf[26];
|
||||
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
|
||||
|
||||
/* print a message */
|
||||
char application_message[256];
|
||||
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
|
||||
printf("%s published : \"%s\"", argv[0], application_message);
|
||||
|
||||
/* publish the time */
|
||||
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2);
|
||||
|
||||
/* check for errors */
|
||||
if (client.error != MQTT_OK) {
|
||||
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
|
||||
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
|
||||
}
|
||||
}
|
||||
|
||||
/* disconnect */
|
||||
printf("\n%s disconnecting from %s\n", argv[0], addr);
|
||||
sleep(1);
|
||||
|
||||
/* exit */
|
||||
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
|
||||
}
|
||||
|
||||
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon)
|
||||
{
|
||||
if (sockfd != NULL) BIO_free_all(sockfd);
|
||||
if (client_daemon != NULL) pthread_cancel(*client_daemon);
|
||||
exit(status);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published)
|
||||
{
|
||||
/* not used in this example */
|
||||
}
|
||||
|
||||
void* client_refresher(void* client)
|
||||
{
|
||||
while(1)
|
||||
{
|
||||
mqtt_sync((struct mqtt_client*) client);
|
||||
usleep(100000U);
|
||||
}
|
||||
return NULL;
|
||||
}
|
|
@ -0,0 +1,163 @@
|
|||
|
||||
/**
|
||||
* @file
|
||||
*/
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include <mqtt.h>
|
||||
#include "templates/mbedtls_sockets.h"
|
||||
|
||||
|
||||
/**
|
||||
* @brief The function that would be called whenever a PUBLISH is received.
|
||||
*
|
||||
* @note This function is not used in this example.
|
||||
*/
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published);
|
||||
|
||||
/**
|
||||
* @brief The client's refresher. This function triggers back-end routines to
|
||||
* handle ingress/egress traffic to the broker.
|
||||
*
|
||||
* @note All this function needs to do is call \ref __mqtt_recv and
|
||||
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
|
||||
* client ingress/egress traffic will be handled every 100 ms.
|
||||
*/
|
||||
void* client_refresher(void* client);
|
||||
|
||||
/**
|
||||
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
|
||||
*/
|
||||
void exit_example(int status, mqtt_pal_socket_handle sockfd, pthread_t *client_daemon);
|
||||
|
||||
/**
|
||||
* A simple program to that publishes the current time whenever ENTER is pressed.
|
||||
*/
|
||||
int main(int argc, const char *argv[])
|
||||
{
|
||||
const char* addr;
|
||||
const char* port;
|
||||
const char* topic;
|
||||
const char* ca_file;
|
||||
|
||||
struct mbedtls_context ctx;
|
||||
mqtt_pal_socket_handle sockfd;
|
||||
|
||||
if (argc > 1) {
|
||||
ca_file = argv[1];
|
||||
} else {
|
||||
printf("error: path to the CA certificate to use\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* get address (argv[2] if present) */
|
||||
if (argc > 2) {
|
||||
addr = argv[2];
|
||||
} else {
|
||||
addr = "test.mosquitto.org";
|
||||
}
|
||||
|
||||
/* get port number (argv[3] if present) */
|
||||
if (argc > 3) {
|
||||
port = argv[3];
|
||||
} else {
|
||||
port = "8883";
|
||||
}
|
||||
|
||||
/* get the topic name to publish */
|
||||
if (argc > 4) {
|
||||
topic = argv[4];
|
||||
} else {
|
||||
topic = "datetime";
|
||||
}
|
||||
|
||||
/* open the non-blocking TCP socket (connecting to the broker) */
|
||||
open_nb_socket(&ctx, addr, port, ca_file);
|
||||
sockfd = &ctx.ssl_ctx;
|
||||
|
||||
if (sockfd == NULL) {
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
}
|
||||
|
||||
/* setup a client */
|
||||
struct mqtt_client client;
|
||||
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
|
||||
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
|
||||
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
|
||||
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
|
||||
|
||||
/* check that we don't have any errors */
|
||||
if (client.error != MQTT_OK) {
|
||||
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
}
|
||||
|
||||
/* start a thread to refresh the client (handle egress and ingree client traffic) */
|
||||
pthread_t client_daemon;
|
||||
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
|
||||
fprintf(stderr, "Failed to start client daemon.\n");
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
|
||||
}
|
||||
|
||||
/* start publishing the time */
|
||||
printf("%s is ready to begin publishing the time.\n", argv[0]);
|
||||
printf("Press ENTER to publish the current time.\n");
|
||||
printf("Press CTRL-D (or any other key) to exit.\n\n");
|
||||
while(fgetc(stdin) == '\n') {
|
||||
/* get the current time */
|
||||
time_t timer;
|
||||
time(&timer);
|
||||
struct tm* tm_info = localtime(&timer);
|
||||
char timebuf[26];
|
||||
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
|
||||
|
||||
/* print a message */
|
||||
char application_message[256];
|
||||
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
|
||||
printf("%s published : \"%s\"", argv[0], application_message);
|
||||
|
||||
/* publish the time */
|
||||
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2);
|
||||
|
||||
/* check for errors */
|
||||
if (client.error != MQTT_OK) {
|
||||
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
|
||||
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
|
||||
}
|
||||
}
|
||||
|
||||
/* disconnect */
|
||||
printf("\n%s disconnecting from %s\n", argv[0], addr);
|
||||
sleep(1);
|
||||
|
||||
/* exit */
|
||||
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
|
||||
}
|
||||
|
||||
void exit_example(int status, mqtt_pal_socket_handle sockfd, pthread_t *client_daemon)
|
||||
{
|
||||
if (client_daemon != NULL) pthread_cancel(*client_daemon);
|
||||
mbedtls_ssl_free(sockfd);
|
||||
/* XXX free the rest of contexts */
|
||||
exit(status);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published)
|
||||
{
|
||||
/* not used in this example */
|
||||
}
|
||||
|
||||
void* client_refresher(void* client)
|
||||
{
|
||||
while(1)
|
||||
{
|
||||
mqtt_sync((struct mqtt_client*) client);
|
||||
usleep(100000U);
|
||||
}
|
||||
return NULL;
|
||||
}
|
|
@ -0,0 +1,167 @@
|
|||
|
||||
/**
|
||||
* @file
|
||||
*/
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include <mqtt.h>
|
||||
#include "templates/openssl_sockets.h"
|
||||
|
||||
|
||||
/**
|
||||
* @brief The function that would be called whenever a PUBLISH is received.
|
||||
*
|
||||
* @note This function is not used in this example.
|
||||
*/
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published);
|
||||
|
||||
/**
|
||||
* @brief The client's refresher. This function triggers back-end routines to
|
||||
* handle ingress/egress traffic to the broker.
|
||||
*
|
||||
* @note All this function needs to do is call \ref __mqtt_recv and
|
||||
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
|
||||
* client ingress/egress traffic will be handled every 100 ms.
|
||||
*/
|
||||
void* client_refresher(void* client);
|
||||
|
||||
/**
|
||||
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
|
||||
*/
|
||||
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon);
|
||||
|
||||
/**
|
||||
* A simple program to that publishes the current time whenever ENTER is pressed.
|
||||
*/
|
||||
int main(int argc, const char *argv[])
|
||||
{
|
||||
const char* addr;
|
||||
const char* port;
|
||||
const char* topic;
|
||||
const char* ca_file;
|
||||
|
||||
/* Load OpenSSL */
|
||||
SSL_load_error_strings();
|
||||
ERR_load_BIO_strings();
|
||||
OpenSSL_add_all_algorithms();
|
||||
SSL_library_init();
|
||||
|
||||
SSL_CTX* ssl_ctx;
|
||||
BIO* sockfd;
|
||||
|
||||
if (argc > 1) {
|
||||
ca_file = argv[1];
|
||||
} else {
|
||||
printf("error: path to the CA certificate to use\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* get address (argv[2] if present) */
|
||||
if (argc > 2) {
|
||||
addr = argv[2];
|
||||
} else {
|
||||
addr = "test.mosquitto.org";
|
||||
}
|
||||
|
||||
/* get port number (argv[3] if present) */
|
||||
if (argc > 3) {
|
||||
port = argv[3];
|
||||
} else {
|
||||
port = "8883";
|
||||
}
|
||||
|
||||
/* get the topic name to publish */
|
||||
if (argc > 4) {
|
||||
topic = argv[4];
|
||||
} else {
|
||||
topic = "datetime";
|
||||
}
|
||||
|
||||
/* open the non-blocking TCP socket (connecting to the broker) */
|
||||
open_nb_socket(&sockfd, &ssl_ctx, addr, port, ca_file, NULL);
|
||||
|
||||
if (sockfd == NULL) {
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
}
|
||||
|
||||
/* setup a client */
|
||||
struct mqtt_client client;
|
||||
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
|
||||
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
|
||||
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
|
||||
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
|
||||
|
||||
/* check that we don't have any errors */
|
||||
if (client.error != MQTT_OK) {
|
||||
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
}
|
||||
|
||||
/* start a thread to refresh the client (handle egress and ingree client traffic) */
|
||||
pthread_t client_daemon;
|
||||
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
|
||||
fprintf(stderr, "Failed to start client daemon.\n");
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
|
||||
}
|
||||
|
||||
/* start publishing the time */
|
||||
printf("%s is ready to begin publishing the time.\n", argv[0]);
|
||||
printf("Press ENTER to publish the current time.\n");
|
||||
printf("Press CTRL-D (or any other key) to exit.\n\n");
|
||||
while(fgetc(stdin) == '\n') {
|
||||
/* get the current time */
|
||||
time_t timer;
|
||||
time(&timer);
|
||||
struct tm* tm_info = localtime(&timer);
|
||||
char timebuf[26];
|
||||
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
|
||||
|
||||
/* print a message */
|
||||
char application_message[256];
|
||||
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
|
||||
printf("%s published : \"%s\"", argv[0], application_message);
|
||||
|
||||
/* publish the time */
|
||||
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2);
|
||||
|
||||
/* check for errors */
|
||||
if (client.error != MQTT_OK) {
|
||||
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
|
||||
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
|
||||
}
|
||||
}
|
||||
|
||||
/* disconnect */
|
||||
printf("\n%s disconnecting from %s\n", argv[0], addr);
|
||||
sleep(1);
|
||||
|
||||
/* exit */
|
||||
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
|
||||
}
|
||||
|
||||
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon)
|
||||
{
|
||||
if (sockfd != NULL) BIO_free_all(sockfd);
|
||||
if (client_daemon != NULL) pthread_cancel(*client_daemon);
|
||||
exit(status);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published)
|
||||
{
|
||||
/* not used in this example */
|
||||
}
|
||||
|
||||
void* client_refresher(void* client)
|
||||
{
|
||||
while(1)
|
||||
{
|
||||
mqtt_sync((struct mqtt_client*) client);
|
||||
usleep(100000U);
|
||||
}
|
||||
return NULL;
|
||||
}
|
|
@ -0,0 +1,199 @@
|
|||
|
||||
/**
|
||||
* @file
|
||||
* A simple subscriber program that performs automatic reconnections.
|
||||
*/
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include <mqtt.h>
|
||||
#include "templates/posix_sockets.h"
|
||||
|
||||
/**
|
||||
* @brief A structure that I will use to keep track of some data needed
|
||||
* to setup the connection to the broker.
|
||||
*
|
||||
* An instance of this struct will be created in my \c main(). Then, whenever
|
||||
* \ref reconnect_client is called, this instance will be passed.
|
||||
*/
|
||||
struct reconnect_state_t {
|
||||
const char* hostname;
|
||||
const char* port;
|
||||
const char* topic;
|
||||
uint8_t* sendbuf;
|
||||
size_t sendbufsz;
|
||||
uint8_t* recvbuf;
|
||||
size_t recvbufsz;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* @brief My reconnect callback. It will reestablish the connection whenever
|
||||
* an error occurs.
|
||||
*/
|
||||
void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr);
|
||||
|
||||
/**
|
||||
* @brief The function will be called whenever a PUBLISH message is received.
|
||||
*/
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published);
|
||||
|
||||
/**
|
||||
* @brief The client's refresher. This function triggers back-end routines to
|
||||
* handle ingress/egress traffic to the broker.
|
||||
*
|
||||
* @note All this function needs to do is call \ref __mqtt_recv and
|
||||
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
|
||||
* client ingress/egress traffic will be handled every 100 ms.
|
||||
*/
|
||||
void* client_refresher(void* client);
|
||||
|
||||
/**
|
||||
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
|
||||
*/
|
||||
void exit_example(int status, int sockfd, pthread_t *client_daemon);
|
||||
|
||||
|
||||
int main(int argc, const char *argv[])
|
||||
{
|
||||
const char* addr;
|
||||
const char* port;
|
||||
const char* topic;
|
||||
|
||||
/* get address (argv[1] if present) */
|
||||
if (argc > 1) {
|
||||
addr = argv[1];
|
||||
} else {
|
||||
addr = "test.mosquitto.org";
|
||||
}
|
||||
|
||||
/* get port number (argv[2] if present) */
|
||||
if (argc > 2) {
|
||||
port = argv[2];
|
||||
} else {
|
||||
port = "1883";
|
||||
}
|
||||
|
||||
/* get the topic name to publish */
|
||||
if (argc > 3) {
|
||||
topic = argv[3];
|
||||
} else {
|
||||
topic = "datetime";
|
||||
}
|
||||
|
||||
/* build the reconnect_state structure which will be passed to reconnect */
|
||||
struct reconnect_state_t reconnect_state;
|
||||
reconnect_state.hostname = addr;
|
||||
reconnect_state.port = port;
|
||||
reconnect_state.topic = topic;
|
||||
uint8_t sendbuf[2048];
|
||||
uint8_t recvbuf[1024];
|
||||
reconnect_state.sendbuf = sendbuf;
|
||||
reconnect_state.sendbufsz = sizeof(sendbuf);
|
||||
reconnect_state.recvbuf = recvbuf;
|
||||
reconnect_state.recvbufsz = sizeof(recvbuf);
|
||||
|
||||
/* setup a client */
|
||||
struct mqtt_client client;
|
||||
|
||||
mqtt_init_reconnect(&client,
|
||||
reconnect_client, &reconnect_state,
|
||||
publish_callback
|
||||
);
|
||||
|
||||
/* start a thread to refresh the client (handle egress and ingree client traffic) */
|
||||
pthread_t client_daemon;
|
||||
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
|
||||
fprintf(stderr, "Failed to start client daemon.\n");
|
||||
exit_example(EXIT_FAILURE, -1, NULL);
|
||||
|
||||
}
|
||||
|
||||
/* start publishing the time */
|
||||
printf("%s listening for '%s' messages.\n", argv[0], topic);
|
||||
printf("Press ENTER to inject an error.\n");
|
||||
printf("Press CTRL-D to exit.\n\n");
|
||||
|
||||
/* block */
|
||||
while(fgetc(stdin) != EOF) {
|
||||
printf("Injecting error: \"MQTT_ERROR_SOCKET_ERROR\"\n");
|
||||
client.error = MQTT_ERROR_SOCKET_ERROR;
|
||||
}
|
||||
|
||||
/* disconnect */
|
||||
printf("\n%s disconnecting from %s\n", argv[0], addr);
|
||||
sleep(1);
|
||||
|
||||
/* exit */
|
||||
exit_example(EXIT_SUCCESS, client.socketfd, &client_daemon);
|
||||
}
|
||||
|
||||
void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr)
|
||||
{
|
||||
struct reconnect_state_t *reconnect_state = *((struct reconnect_state_t**) reconnect_state_vptr);
|
||||
|
||||
/* Close the clients socket if this isn't the initial reconnect call */
|
||||
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
|
||||
close(client->socketfd);
|
||||
}
|
||||
|
||||
/* Perform error handling here. */
|
||||
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
|
||||
printf("reconnect_client: called while client was in error state \"%s\"\n",
|
||||
mqtt_error_str(client->error)
|
||||
);
|
||||
}
|
||||
|
||||
/* Open a new socket. */
|
||||
int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port);
|
||||
if (sockfd == -1) {
|
||||
perror("Failed to open socket: ");
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
}
|
||||
|
||||
/* Reinitialize the client. */
|
||||
mqtt_reinit(client, sockfd,
|
||||
reconnect_state->sendbuf, reconnect_state->sendbufsz,
|
||||
reconnect_state->recvbuf, reconnect_state->recvbufsz
|
||||
);
|
||||
|
||||
/* Create an anonymous session */
|
||||
const char* client_id = NULL;
|
||||
/* Ensure we have a clean session */
|
||||
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
|
||||
/* Send connection request to the broker. */
|
||||
mqtt_connect(client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);
|
||||
|
||||
/* Subscribe to the topic. */
|
||||
mqtt_subscribe(client, reconnect_state->topic, 0);
|
||||
}
|
||||
|
||||
void exit_example(int status, int sockfd, pthread_t *client_daemon)
|
||||
{
|
||||
if (sockfd != -1) close(sockfd);
|
||||
if (client_daemon != NULL) pthread_cancel(*client_daemon);
|
||||
exit(status);
|
||||
}
|
||||
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published)
|
||||
{
|
||||
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
|
||||
char* topic_name = (char*) malloc(published->topic_name_size + 1);
|
||||
memcpy(topic_name, published->topic_name, published->topic_name_size);
|
||||
topic_name[published->topic_name_size] = '\0';
|
||||
|
||||
printf("Received publish('%s'): %s\n", topic_name, (const char*) published->application_message);
|
||||
|
||||
free(topic_name);
|
||||
}
|
||||
|
||||
void* client_refresher(void* client)
|
||||
{
|
||||
while(1)
|
||||
{
|
||||
mqtt_sync((struct mqtt_client*) client);
|
||||
usleep(100000U);
|
||||
}
|
||||
return NULL;
|
||||
}
|
|
@ -0,0 +1,157 @@
|
|||
|
||||
/**
|
||||
* @file
|
||||
* A simple program to that publishes the current time whenever ENTER is pressed.
|
||||
*/
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include <mqtt.h>
|
||||
#include "templates/posix_sockets.h"
|
||||
|
||||
|
||||
/**
|
||||
* @brief The function that would be called whenever a PUBLISH is received.
|
||||
*
|
||||
* @note This function is not used in this example.
|
||||
*/
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published);
|
||||
|
||||
/**
|
||||
* @brief The client's refresher. This function triggers back-end routines to
|
||||
* handle ingress/egress traffic to the broker.
|
||||
*
|
||||
* @note All this function needs to do is call \ref __mqtt_recv and
|
||||
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
|
||||
* client ingress/egress traffic will be handled every 100 ms.
|
||||
*/
|
||||
void* client_refresher(void* client);
|
||||
|
||||
/**
|
||||
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
|
||||
*/
|
||||
void exit_example(int status, int sockfd, pthread_t *client_daemon);
|
||||
|
||||
/**
|
||||
* A simple program to that publishes the current time whenever ENTER is pressed.
|
||||
*/
|
||||
int main(int argc, const char *argv[])
|
||||
{
|
||||
const char* addr;
|
||||
const char* port;
|
||||
const char* topic;
|
||||
|
||||
/* get address (argv[1] if present) */
|
||||
if (argc > 1) {
|
||||
addr = argv[1];
|
||||
} else {
|
||||
addr = "test.mosquitto.org";
|
||||
}
|
||||
|
||||
/* get port number (argv[2] if present) */
|
||||
if (argc > 2) {
|
||||
port = argv[2];
|
||||
} else {
|
||||
port = "1883";
|
||||
}
|
||||
|
||||
/* get the topic name to publish */
|
||||
if (argc > 3) {
|
||||
topic = argv[3];
|
||||
} else {
|
||||
topic = "datetime";
|
||||
}
|
||||
|
||||
/* open the non-blocking TCP socket (connecting to the broker) */
|
||||
int sockfd = open_nb_socket(addr, port);
|
||||
|
||||
if (sockfd == -1) {
|
||||
perror("Failed to open socket: ");
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
}
|
||||
|
||||
/* setup a client */
|
||||
struct mqtt_client client;
|
||||
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
|
||||
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
|
||||
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
|
||||
/* Create an anonymous session */
|
||||
const char* client_id = NULL;
|
||||
/* Ensure we have a clean session */
|
||||
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
|
||||
/* Send connection request to the broker. */
|
||||
mqtt_connect(&client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);
|
||||
|
||||
/* check that we don't have any errors */
|
||||
if (client.error != MQTT_OK) {
|
||||
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
}
|
||||
|
||||
/* start a thread to refresh the client (handle egress and ingree client traffic) */
|
||||
pthread_t client_daemon;
|
||||
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
|
||||
fprintf(stderr, "Failed to start client daemon.\n");
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
|
||||
}
|
||||
|
||||
/* start publishing the time */
|
||||
printf("%s is ready to begin publishing the time.\n", argv[0]);
|
||||
printf("Press ENTER to publish the current time.\n");
|
||||
printf("Press CTRL-D (or any other key) to exit.\n\n");
|
||||
while(fgetc(stdin) == '\n') {
|
||||
/* get the current time */
|
||||
time_t timer;
|
||||
time(&timer);
|
||||
struct tm* tm_info = localtime(&timer);
|
||||
char timebuf[26];
|
||||
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
|
||||
|
||||
/* print a message */
|
||||
char application_message[256];
|
||||
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
|
||||
printf("%s published : \"%s\"", argv[0], application_message);
|
||||
|
||||
/* publish the time */
|
||||
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_0);
|
||||
|
||||
/* check for errors */
|
||||
if (client.error != MQTT_OK) {
|
||||
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
|
||||
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
|
||||
}
|
||||
}
|
||||
|
||||
/* disconnect */
|
||||
printf("\n%s disconnecting from %s\n", argv[0], addr);
|
||||
sleep(1);
|
||||
|
||||
/* exit */
|
||||
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
|
||||
}
|
||||
|
||||
void exit_example(int status, int sockfd, pthread_t *client_daemon)
|
||||
{
|
||||
if (sockfd != -1) close(sockfd);
|
||||
if (client_daemon != NULL) pthread_cancel(*client_daemon);
|
||||
exit(status);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published)
|
||||
{
|
||||
/* not used in this example */
|
||||
}
|
||||
|
||||
void* client_refresher(void* client)
|
||||
{
|
||||
while(1)
|
||||
{
|
||||
mqtt_sync((struct mqtt_client*) client);
|
||||
usleep(100000U);
|
||||
}
|
||||
return NULL;
|
||||
}
|
|
@ -0,0 +1,142 @@
|
|||
|
||||
/**
|
||||
* @file
|
||||
* A simple program that subscribes to a topic.
|
||||
*/
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include <mqtt.h>
|
||||
#include "templates/posix_sockets.h"
|
||||
|
||||
|
||||
/**
|
||||
* @brief The function will be called whenever a PUBLISH message is received.
|
||||
*/
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published);
|
||||
|
||||
/**
|
||||
* @brief The client's refresher. This function triggers back-end routines to
|
||||
* handle ingress/egress traffic to the broker.
|
||||
*
|
||||
* @note All this function needs to do is call \ref __mqtt_recv and
|
||||
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
|
||||
* client ingress/egress traffic will be handled every 100 ms.
|
||||
*/
|
||||
void* client_refresher(void* client);
|
||||
|
||||
/**
|
||||
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
|
||||
*/
|
||||
void exit_example(int status, int sockfd, pthread_t *client_daemon);
|
||||
|
||||
int main(int argc, const char *argv[])
|
||||
{
|
||||
const char* addr;
|
||||
const char* port;
|
||||
const char* topic;
|
||||
|
||||
/* get address (argv[1] if present) */
|
||||
if (argc > 1) {
|
||||
addr = argv[1];
|
||||
} else {
|
||||
addr = "test.mosquitto.org";
|
||||
}
|
||||
|
||||
/* get port number (argv[2] if present) */
|
||||
if (argc > 2) {
|
||||
port = argv[2];
|
||||
} else {
|
||||
port = "1883";
|
||||
}
|
||||
|
||||
/* get the topic name to publish */
|
||||
if (argc > 3) {
|
||||
topic = argv[3];
|
||||
} else {
|
||||
topic = "datetime";
|
||||
}
|
||||
|
||||
/* open the non-blocking TCP socket (connecting to the broker) */
|
||||
int sockfd = open_nb_socket(addr, port);
|
||||
|
||||
if (sockfd == -1) {
|
||||
perror("Failed to open socket: ");
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
}
|
||||
|
||||
/* setup a client */
|
||||
struct mqtt_client client;
|
||||
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
|
||||
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
|
||||
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
|
||||
/* Create an anonymous session */
|
||||
const char* client_id = NULL;
|
||||
/* Ensure we have a clean session */
|
||||
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
|
||||
/* Send connection request to the broker. */
|
||||
mqtt_connect(&client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);
|
||||
|
||||
/* check that we don't have any errors */
|
||||
if (client.error != MQTT_OK) {
|
||||
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
}
|
||||
|
||||
/* start a thread to refresh the client (handle egress and ingree client traffic) */
|
||||
pthread_t client_daemon;
|
||||
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
|
||||
fprintf(stderr, "Failed to start client daemon.\n");
|
||||
exit_example(EXIT_FAILURE, sockfd, NULL);
|
||||
|
||||
}
|
||||
|
||||
/* subscribe */
|
||||
mqtt_subscribe(&client, topic, 0);
|
||||
|
||||
/* start publishing the time */
|
||||
printf("%s listening for '%s' messages.\n", argv[0], topic);
|
||||
printf("Press CTRL-D to exit.\n\n");
|
||||
|
||||
/* block */
|
||||
while(fgetc(stdin) != EOF);
|
||||
|
||||
/* disconnect */
|
||||
printf("\n%s disconnecting from %s\n", argv[0], addr);
|
||||
sleep(1);
|
||||
|
||||
/* exit */
|
||||
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
|
||||
}
|
||||
|
||||
void exit_example(int status, int sockfd, pthread_t *client_daemon)
|
||||
{
|
||||
if (sockfd != -1) close(sockfd);
|
||||
if (client_daemon != NULL) pthread_cancel(*client_daemon);
|
||||
exit(status);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void publish_callback(void** unused, struct mqtt_response_publish *published)
|
||||
{
|
||||
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
|
||||
char* topic_name = (char*) malloc(published->topic_name_size + 1);
|
||||
memcpy(topic_name, published->topic_name, published->topic_name_size);
|
||||
topic_name[published->topic_name_size] = '\0';
|
||||
|
||||
printf("Received publish('%s'): %s\n", topic_name, (const char*) published->application_message);
|
||||
|
||||
free(topic_name);
|
||||
}
|
||||
|
||||
void* client_refresher(void* client)
|
||||
{
|
||||
while(1)
|
||||
{
|
||||
mqtt_sync((struct mqtt_client*) client);
|
||||
usleep(100000U);
|
||||
}
|
||||
return NULL;
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
#ifndef __BIO_SOCKET_TEMPLATE_H__
|
||||
#define __BIO_SOCKET_TEMPLATE_H__
|
||||
|
||||
#include <openssl/bio.h>
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
|
||||
/*
|
||||
A template for opening a non-blocking BIO socket.
|
||||
*/
|
||||
BIO* open_nb_socket(const char* addr, const char* port) {
|
||||
BIO* bio = BIO_new_connect(addr);
|
||||
BIO_set_nbio(bio, 1);
|
||||
BIO_set_conn_port(bio, port);
|
||||
|
||||
/* timeout after 10 seconds */
|
||||
int start_time = time(NULL);
|
||||
while(BIO_do_connect(bio) == 0 && (int)time(NULL) - start_time < 10);
|
||||
|
||||
if (BIO_do_connect(bio) <= 0) {
|
||||
fprintf(stderr, "Failed to open socket: BIO_do_connect returned <= 0\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return bio;
|
||||
}
|
||||
|
||||
#endif
|
|
@ -0,0 +1,145 @@
|
|||
#ifndef __MBEDTLS_SOCKET_TEMPLATE_H__
|
||||
#define __MBEDTLS_SOCKET_TEMPLATE_H__
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <mbedtls/error.h>
|
||||
#include <mbedtls/entropy.h>
|
||||
#include <mbedtls/ctr_drbg.h>
|
||||
#include <mbedtls/net_sockets.h>
|
||||
#include <mbedtls/ssl.h>
|
||||
|
||||
#if !defined(MBEDTLS_NET_POLL_READ)
|
||||
/* compat for older mbedtls */
|
||||
#define MBEDTLS_NET_POLL_READ 1
|
||||
#define MBEDTLS_NET_POLL_WRITE 1
|
||||
|
||||
int
|
||||
mbedtls_net_poll(mbedtls_net_context * ctx, uint32_t rw, uint32_t timeout)
|
||||
{
|
||||
/* XXX this is not ideal but good enough for an example */
|
||||
usleep(300);
|
||||
return 1;
|
||||
}
|
||||
#endif
|
||||
|
||||
struct mbedtls_context {
|
||||
mbedtls_net_context net_ctx;
|
||||
mbedtls_ssl_context ssl_ctx;
|
||||
mbedtls_ssl_config ssl_conf;
|
||||
mbedtls_x509_crt ca_crt;
|
||||
mbedtls_entropy_context entropy;
|
||||
mbedtls_ctr_drbg_context ctr_drbg;
|
||||
};
|
||||
|
||||
void failed(const char *fn, int rv) {
|
||||
char buf[100];
|
||||
mbedtls_strerror(rv, buf, sizeof(buf));
|
||||
printf("%s failed with %x (%s)\n", fn, -rv, buf);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
void cert_verify_failed(uint32_t rv) {
|
||||
char buf[512];
|
||||
mbedtls_x509_crt_verify_info(buf, sizeof(buf), "\t", rv);
|
||||
printf("Certificate verification failed (%0" PRIx32 ")\n%s\n", rv, buf);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/*
|
||||
A template for opening a non-blocking mbed TLS connection.
|
||||
*/
|
||||
void open_nb_socket(struct mbedtls_context *ctx,
|
||||
const char *hostname,
|
||||
const char *port,
|
||||
const char *ca_file) {
|
||||
const unsigned char *additional = (const unsigned char *)"MQTT-C";
|
||||
size_t additional_len = 6;
|
||||
int rv;
|
||||
|
||||
mbedtls_net_context *net_ctx = &ctx->net_ctx;
|
||||
mbedtls_ssl_context *ssl_ctx = &ctx->ssl_ctx;
|
||||
mbedtls_ssl_config *ssl_conf = &ctx->ssl_conf;
|
||||
mbedtls_x509_crt *ca_crt = &ctx->ca_crt;
|
||||
mbedtls_entropy_context *entropy = &ctx->entropy;
|
||||
mbedtls_ctr_drbg_context *ctr_drbg = &ctx->ctr_drbg;
|
||||
|
||||
mbedtls_entropy_init(entropy);
|
||||
mbedtls_ctr_drbg_init(ctr_drbg);
|
||||
rv = mbedtls_ctr_drbg_seed(ctr_drbg, mbedtls_entropy_func, entropy,
|
||||
additional, additional_len);
|
||||
if (rv != 0) {
|
||||
failed("mbedtls_ctr_drbg_seed", rv);
|
||||
}
|
||||
|
||||
mbedtls_x509_crt_init(ca_crt);
|
||||
rv = mbedtls_x509_crt_parse_file(ca_crt, ca_file);
|
||||
if (rv != 0) {
|
||||
failed("mbedtls_x509_crt_parse_file", rv);
|
||||
}
|
||||
|
||||
mbedtls_ssl_config_init(ssl_conf);
|
||||
rv = mbedtls_ssl_config_defaults(ssl_conf, MBEDTLS_SSL_IS_CLIENT,
|
||||
MBEDTLS_SSL_TRANSPORT_STREAM,
|
||||
MBEDTLS_SSL_PRESET_DEFAULT);
|
||||
if (rv != 0) {
|
||||
failed("mbedtls_ssl_config_defaults", rv);
|
||||
}
|
||||
mbedtls_ssl_conf_ca_chain(ssl_conf, ca_crt, NULL);
|
||||
mbedtls_ssl_conf_authmode(ssl_conf, MBEDTLS_SSL_VERIFY_OPTIONAL);
|
||||
mbedtls_ssl_conf_rng(ssl_conf, mbedtls_ctr_drbg_random, ctr_drbg);
|
||||
|
||||
mbedtls_net_init(net_ctx);
|
||||
rv = mbedtls_net_connect(net_ctx, hostname, port, MBEDTLS_NET_PROTO_TCP);
|
||||
if (rv != 0) {
|
||||
failed("mbedtls_net_connect", rv);
|
||||
}
|
||||
rv = mbedtls_net_set_nonblock(net_ctx);
|
||||
if (rv != 0) {
|
||||
failed("mbedtls_net_set_nonblock", rv);
|
||||
}
|
||||
|
||||
mbedtls_ssl_init(ssl_ctx);
|
||||
rv = mbedtls_ssl_setup(ssl_ctx, ssl_conf);
|
||||
if (rv != 0) {
|
||||
failed("mbedtls_ssl_setup", rv);
|
||||
}
|
||||
rv = mbedtls_ssl_set_hostname(ssl_ctx, hostname);
|
||||
if (rv != 0) {
|
||||
failed("mbedtls_ssl_set_hostname", rv);
|
||||
}
|
||||
mbedtls_ssl_set_bio(ssl_ctx, net_ctx,
|
||||
mbedtls_net_send, mbedtls_net_recv, NULL);
|
||||
|
||||
for (;;) {
|
||||
rv = mbedtls_ssl_handshake(ssl_ctx);
|
||||
uint32_t want = 0;
|
||||
if (rv == MBEDTLS_ERR_SSL_WANT_READ) {
|
||||
want |= MBEDTLS_NET_POLL_READ;
|
||||
} else if (rv == MBEDTLS_ERR_SSL_WANT_WRITE) {
|
||||
want |= MBEDTLS_NET_POLL_WRITE;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
rv = mbedtls_net_poll(net_ctx, want, -1);
|
||||
if (rv < 0) {
|
||||
failed("mbedtls_net_poll", rv);
|
||||
}
|
||||
}
|
||||
if (rv != 0) {
|
||||
failed("mbedtls_ssl_handshake", rv);
|
||||
}
|
||||
uint32_t result = mbedtls_ssl_get_verify_result(ssl_ctx);
|
||||
if (result != 0) {
|
||||
if (result == (uint32_t)-1) {
|
||||
failed("mbedtls_ssl_get_verify_result", result);
|
||||
} else {
|
||||
cert_verify_failed(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -0,0 +1,52 @@
|
|||
#ifndef __OPENSSL_SOCKET_TEMPLATE_H__
|
||||
#define __OPENSSL_SOCKET_TEMPLATE_H__
|
||||
|
||||
#include <openssl/bio.h>
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
|
||||
/*
|
||||
A template for opening a non-blocking OpenSSL connection.
|
||||
*/
|
||||
void open_nb_socket(BIO** bio, SSL_CTX** ssl_ctx, const char* addr, const char* port, const char* ca_file, const char* ca_path) {
|
||||
*ssl_ctx = SSL_CTX_new(SSLv23_client_method());
|
||||
SSL* ssl;
|
||||
|
||||
/* load certificate */
|
||||
if (!SSL_CTX_load_verify_locations(*ssl_ctx, ca_file, ca_path)) {
|
||||
printf("error: failed to load certificate\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* open BIO socket */
|
||||
*bio = BIO_new_ssl_connect(*ssl_ctx);
|
||||
BIO_get_ssl(*bio, &ssl);
|
||||
SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY);
|
||||
BIO_set_conn_hostname(*bio, addr);
|
||||
BIO_set_nbio(*bio, 1);
|
||||
BIO_set_conn_port(*bio, port);
|
||||
|
||||
/* wait for connect with 10 second timeout */
|
||||
int start_time = time(NULL);
|
||||
int do_connect_rv = BIO_do_connect(*bio);
|
||||
while(do_connect_rv <= 0 && BIO_should_retry(*bio) && (int)time(NULL) - start_time < 10) {
|
||||
do_connect_rv = BIO_do_connect(*bio);
|
||||
}
|
||||
if (do_connect_rv <= 0) {
|
||||
printf("error: %s\n", ERR_reason_error_string(ERR_get_error()));
|
||||
BIO_free_all(*bio);
|
||||
SSL_CTX_free(*ssl_ctx);
|
||||
*bio = NULL;
|
||||
*ssl_ctx=NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
/* verify certificate */
|
||||
if (SSL_get_verify_result(ssl) != X509_V_OK) {
|
||||
/* Handle the failed verification */
|
||||
printf("error: x509 certificate verification failed\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -0,0 +1,59 @@
|
|||
#ifndef __POSIX_SOCKET_TEMPLATE_H__
|
||||
#define __POSIX_SOCKET_TEMPLATE_H__
|
||||
|
||||
#include <stdio.h>
|
||||
#include <sys/types.h>
|
||||
#if !defined(WIN32)
|
||||
#include <sys/socket.h>
|
||||
#include <netdb.h>
|
||||
#endif
|
||||
#include <fcntl.h>
|
||||
|
||||
/*
|
||||
A template for opening a non-blocking POSIX socket.
|
||||
*/
|
||||
int open_nb_socket(const char* addr, const char* port) {
|
||||
struct addrinfo hints = {0};
|
||||
|
||||
hints.ai_family = AF_UNSPEC; /* IPv4 or IPv6 */
|
||||
hints.ai_socktype = SOCK_STREAM; /* Must be TCP */
|
||||
int sockfd = -1;
|
||||
int rv;
|
||||
struct addrinfo *p, *servinfo;
|
||||
|
||||
/* get address information */
|
||||
rv = getaddrinfo(addr, port, &hints, &servinfo);
|
||||
if(rv != 0) {
|
||||
fprintf(stderr, "Failed to open socket (getaddrinfo): %s\n", gai_strerror(rv));
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* open the first possible socket */
|
||||
for(p = servinfo; p != NULL; p = p->ai_next) {
|
||||
sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
|
||||
if (sockfd == -1) continue;
|
||||
|
||||
/* connect to server */
|
||||
rv = connect(sockfd, servinfo->ai_addr, servinfo->ai_addrlen);
|
||||
if(rv == -1) continue;
|
||||
break;
|
||||
}
|
||||
|
||||
/* free servinfo */
|
||||
freeaddrinfo(servinfo);
|
||||
|
||||
/* make non-blocking */
|
||||
#if !defined(WIN32)
|
||||
if (sockfd != -1) fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
|
||||
#else
|
||||
if (sockfd != INVALID_SOCKET) {
|
||||
int iMode = 1;
|
||||
ioctlsocket(sockfd, FIONBIO, &iMode);
|
||||
}
|
||||
#endif
|
||||
|
||||
/* return the new socket fd */
|
||||
return sockfd;
|
||||
}
|
||||
|
||||
#endif
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,157 @@
|
|||
#ifndef __MQTT_PAL_H__
|
||||
#define __MQTT_PAL_H__
|
||||
|
||||
/*
|
||||
MIT License
|
||||
|
||||
Copyright(c) 2018 Liam Bindle
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files(the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions :
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @file
|
||||
* @brief Includes/supports the types/calls required by the MQTT-C client.
|
||||
*
|
||||
* @note This is the \em only file included in mqtt.h, and mqtt.c. It is therefore
|
||||
* responsible for including/supporting all the required types and calls.
|
||||
*
|
||||
* @defgroup pal Platform abstraction layer
|
||||
* @brief Documentation of the types and calls required to port MQTT-C to a new platform.
|
||||
*
|
||||
* mqtt_pal.h is the \em only header file included in mqtt.c. Therefore, to port MQTT-C to a
|
||||
* new platform the following types, functions, constants, and macros must be defined in
|
||||
* mqtt_pal.h:
|
||||
* - Types:
|
||||
* - \c size_t, \c ssize_t
|
||||
* - \c uint8_t, \c uint16_t, \c uint32_t
|
||||
* - \c va_list
|
||||
* - \c mqtt_pal_time_t : return type of \c MQTT_PAL_TIME()
|
||||
* - \c mqtt_pal_mutex_t : type of the argument that is passed to \c MQTT_PAL_MUTEX_LOCK and
|
||||
* \c MQTT_PAL_MUTEX_RELEASE
|
||||
* - Functions:
|
||||
* - \c memcpy, \c strlen
|
||||
* - \c va_start, \c va_arg, \c va_end
|
||||
* - Constants:
|
||||
* - \c INT_MIN
|
||||
*
|
||||
* Additionally, three macro's are required:
|
||||
* - \c MQTT_PAL_HTONS(s) : host-to-network endian conversion for uint16_t.
|
||||
* - \c MQTT_PAL_NTOHS(s) : network-to-host endian conversion for uint16_t.
|
||||
* - \c MQTT_PAL_TIME() : returns [type: \c mqtt_pal_time_t] current time in seconds.
|
||||
* - \c MQTT_PAL_MUTEX_LOCK(mtx_pointer) : macro that locks the mutex pointed to by \c mtx_pointer.
|
||||
* - \c MQTT_PAL_MUTEX_RELEASE(mtx_pointer) : macro that unlocks the mutex pointed to by
|
||||
* \c mtx_pointer.
|
||||
*
|
||||
* Lastly, \ref mqtt_pal_sendall and \ref mqtt_pal_recvall, must be implemented in mqtt_pal.c
|
||||
* for sending and receiving data using the platforms socket calls.
|
||||
*/
|
||||
|
||||
|
||||
/* UNIX-like platform support */
|
||||
#if defined(__unix__) || defined(__APPLE__)
|
||||
#include <limits.h>
|
||||
#include <string.h>
|
||||
#include <stdarg.h>
|
||||
#include <time.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#define MQTT_PAL_HTONS(s) htons(s)
|
||||
#define MQTT_PAL_NTOHS(s) ntohs(s)
|
||||
|
||||
#define MQTT_PAL_TIME() time(NULL)
|
||||
|
||||
typedef time_t mqtt_pal_time_t;
|
||||
typedef pthread_mutex_t mqtt_pal_mutex_t;
|
||||
|
||||
#define MQTT_PAL_MUTEX_INIT(mtx_ptr) pthread_mutex_init(mtx_ptr, NULL)
|
||||
#define MQTT_PAL_MUTEX_LOCK(mtx_ptr) pthread_mutex_lock(mtx_ptr)
|
||||
#define MQTT_PAL_MUTEX_UNLOCK(mtx_ptr) pthread_mutex_unlock(mtx_ptr)
|
||||
|
||||
#ifndef MQTT_USE_CUSTOM_SOCKET_HANDLE
|
||||
#ifdef MQTT_USE_MBEDTLS
|
||||
struct mbedtls_ssl_context;
|
||||
typedef struct mbedtls_ssl_context *mqtt_pal_socket_handle;
|
||||
#elif defined(MQTT_USE_BIO)
|
||||
#include <openssl/bio.h>
|
||||
typedef BIO* mqtt_pal_socket_handle;
|
||||
#else
|
||||
typedef int mqtt_pal_socket_handle;
|
||||
#endif
|
||||
#endif
|
||||
#elif defined(_MSC_VER)
|
||||
#include <limits.h>
|
||||
#include <windows.h>
|
||||
#include <time.h>
|
||||
#include <stdint.h>
|
||||
#include <winsock2.h>
|
||||
|
||||
typedef SSIZE_T ssize_t;
|
||||
#define MQTT_PAL_HTONS(s) htons(s)
|
||||
#define MQTT_PAL_NTOHS(s) ntohs(s)
|
||||
|
||||
#define MQTT_PAL_TIME() time(NULL)
|
||||
|
||||
typedef time_t mqtt_pal_time_t;
|
||||
typedef CRITICAL_SECTION mqtt_pal_mutex_t;
|
||||
|
||||
#define MQTT_PAL_MUTEX_INIT(mtx_ptr) InitializeCriticalSection(mtx_ptr)
|
||||
#define MQTT_PAL_MUTEX_LOCK(mtx_ptr) EnterCriticalSection(mtx_ptr)
|
||||
#define MQTT_PAL_MUTEX_UNLOCK(mtx_ptr) LeaveCriticalSection(mtx_ptr)
|
||||
|
||||
|
||||
#ifndef MQTT_USE_CUSTOM_SOCKET_HANDLE
|
||||
#ifdef MQTT_USE_BIO
|
||||
#include <openssl/bio.h>
|
||||
typedef BIO* mqtt_pal_socket_handle;
|
||||
#else
|
||||
typedef SOCKET mqtt_pal_socket_handle;
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Sends all the bytes in a buffer.
|
||||
* @ingroup pal
|
||||
*
|
||||
* @param[in] fd The file-descriptor (or handle) of the socket.
|
||||
* @param[in] buf A pointer to the first byte in the buffer to send.
|
||||
* @param[in] len The number of bytes to send (starting at \p buf).
|
||||
* @param[in] flags Flags which are passed to the underlying socket.
|
||||
*
|
||||
* @returns The number of bytes sent if successful, an \ref MQTTErrors otherwise.
|
||||
*/
|
||||
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags);
|
||||
|
||||
/**
|
||||
* @brief Non-blocking receive all the byte available.
|
||||
* @ingroup pal
|
||||
*
|
||||
* @param[in] fd The file-descriptor (or handle) of the socket.
|
||||
* @param[in] buf A pointer to the receive buffer.
|
||||
* @param[in] bufsz The max number of bytes that can be put into \p buf.
|
||||
* @param[in] flags Flags which are passed to the underlying socket.
|
||||
*
|
||||
* @returns The number of bytes received if successful, an \ref MQTTErrors otherwise.
|
||||
*/
|
||||
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags);
|
||||
|
||||
#endif
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,203 @@
|
|||
/*
|
||||
MIT License
|
||||
|
||||
Copyright(c) 2018 Liam Bindle
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files(the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions :
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
|
||||
#include <mqtt.h>
|
||||
|
||||
/**
|
||||
* @file
|
||||
* @brief Implements @ref mqtt_pal_sendall and @ref mqtt_pal_recvall and
|
||||
* any platform-specific helpers you'd like.
|
||||
* @cond Doxygen_Suppress
|
||||
*/
|
||||
|
||||
|
||||
#ifdef MQTT_USE_MBEDTLS
|
||||
#include <mbedtls/ssl.h>
|
||||
|
||||
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
|
||||
size_t sent = 0;
|
||||
while(sent < len) {
|
||||
int rv = mbedtls_ssl_write(fd, buf + sent, len - sent);
|
||||
if (rv < 0) {
|
||||
if (rv == MBEDTLS_ERR_SSL_WANT_READ ||
|
||||
rv == MBEDTLS_ERR_SSL_WANT_WRITE
|
||||
#if defined(MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS)
|
||||
|| rv == MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS
|
||||
#endif
|
||||
#if defined(MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS)
|
||||
|| rv == MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS
|
||||
#endif
|
||||
) {
|
||||
/* should call mbedtls_ssl_writer later again */
|
||||
break;
|
||||
}
|
||||
return MQTT_ERROR_SOCKET_ERROR;
|
||||
}
|
||||
sent += (size_t) rv;
|
||||
}
|
||||
return sent;
|
||||
}
|
||||
|
||||
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
|
||||
const void *const start = buf;
|
||||
int rv;
|
||||
do {
|
||||
rv = mbedtls_ssl_read(fd, buf, bufsz);
|
||||
if (rv < 0) {
|
||||
if (rv == MBEDTLS_ERR_SSL_WANT_READ ||
|
||||
rv == MBEDTLS_ERR_SSL_WANT_WRITE
|
||||
#if defined(MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS)
|
||||
|| rv == MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS
|
||||
#endif
|
||||
#if defined(MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS)
|
||||
|| rv == MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS
|
||||
#endif
|
||||
) {
|
||||
/* should call mbedtls_ssl_read later again */
|
||||
break;
|
||||
}
|
||||
return MQTT_ERROR_SOCKET_ERROR;
|
||||
}
|
||||
buf = (char*)buf + rv;
|
||||
bufsz -= rv;
|
||||
} while (rv > 0);
|
||||
|
||||
return buf - start;
|
||||
}
|
||||
|
||||
#elif defined(MQTT_USE_BIO)
|
||||
#include <openssl/bio.h>
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
|
||||
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
|
||||
size_t sent = 0;
|
||||
while(sent < len) {
|
||||
int tmp = BIO_write(fd, buf + sent, len - sent);
|
||||
if (tmp > 0) {
|
||||
sent += (size_t) tmp;
|
||||
} else if (tmp <= 0 && !BIO_should_retry(fd)) {
|
||||
return MQTT_ERROR_SOCKET_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return sent;
|
||||
}
|
||||
|
||||
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
|
||||
const void *const start = buf;
|
||||
int rv;
|
||||
do {
|
||||
rv = BIO_read(fd, buf, bufsz);
|
||||
if (rv > 0) {
|
||||
/* successfully read bytes from the socket */
|
||||
buf += rv;
|
||||
bufsz -= rv;
|
||||
} else if (!BIO_should_retry(fd)) {
|
||||
/* an error occurred that wasn't "nothing to read". */
|
||||
return MQTT_ERROR_SOCKET_ERROR;
|
||||
}
|
||||
} while (!BIO_should_read(fd));
|
||||
|
||||
return (ssize_t)(buf - start);
|
||||
}
|
||||
|
||||
#elif defined(__unix__) || defined(__APPLE__)
|
||||
|
||||
#include <errno.h>
|
||||
|
||||
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
|
||||
size_t sent = 0;
|
||||
while(sent < len) {
|
||||
ssize_t tmp = send(fd, buf + sent, len - sent, flags);
|
||||
if (tmp < 1) {
|
||||
return MQTT_ERROR_SOCKET_ERROR;
|
||||
}
|
||||
sent += (size_t) tmp;
|
||||
}
|
||||
return sent;
|
||||
}
|
||||
|
||||
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
|
||||
const void *const start = buf;
|
||||
ssize_t rv;
|
||||
do {
|
||||
rv = recv(fd, buf, bufsz, flags);
|
||||
if (rv > 0) {
|
||||
/* successfully read bytes from the socket */
|
||||
buf += rv;
|
||||
bufsz -= rv;
|
||||
} else if (rv < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
|
||||
/* an error occurred that wasn't "nothing to read". */
|
||||
return MQTT_ERROR_SOCKET_ERROR;
|
||||
}
|
||||
} while (rv > 0);
|
||||
|
||||
return buf - start;
|
||||
}
|
||||
|
||||
#elif defined(_MSC_VER)
|
||||
|
||||
#include <errno.h>
|
||||
|
||||
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
|
||||
size_t sent = 0;
|
||||
while(sent < len) {
|
||||
ssize_t tmp = send(fd, (char*)buf + sent, len - sent, flags);
|
||||
if (tmp < 1) {
|
||||
return MQTT_ERROR_SOCKET_ERROR;
|
||||
}
|
||||
sent += (size_t) tmp;
|
||||
}
|
||||
return sent;
|
||||
}
|
||||
|
||||
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
|
||||
const char *const start = buf;
|
||||
ssize_t rv;
|
||||
do {
|
||||
rv = recv(fd, buf, bufsz, flags);
|
||||
if (rv > 0) {
|
||||
/* successfully read bytes from the socket */
|
||||
buf = (char*)buf + rv;
|
||||
bufsz -= rv;
|
||||
} else if (rv < 0) {
|
||||
int err = WSAGetLastError();
|
||||
if (err != WSAEWOULDBLOCK) {
|
||||
/* an error occurred that wasn't "nothing to read". */
|
||||
return MQTT_ERROR_SOCKET_ERROR;
|
||||
}
|
||||
}
|
||||
} while (rv > 0);
|
||||
|
||||
return (ssize_t)((char*)buf - start);
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
#error No PAL!
|
||||
|
||||
#endif
|
||||
|
||||
/** @endcond */
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue