Created
January 8, 2022 15:21
-
-
Save pvtom/1ad56c5c4c3d6d6f98347b8037938217 to your computer and use it in GitHub Desktop.
MQTT Subscription Client based on Mosquitto Library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Preparation: sudo apt-get install libmosquitto-dev | |
* Compilation: gcc mqttsub.c -o mqttsub -lmosquitto | |
* Usage: ./mqttsub --help | |
* | |
*/ | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <unistd.h> | |
#if !defined(__MACH__) | |
#include <malloc.h> | |
#endif | |
#include <sys/time.h> | |
#include <time.h> | |
#include <libgen.h> | |
#include <regex.h> | |
#include <mosquitto.h> | |
typedef struct _mqttattr { | |
int qos; | |
int mode; | |
int verbose; | |
int count; | |
int tlen; | |
char **topics; | |
int flen; | |
char **filters; | |
} mqttattr; | |
mqttattr create_mqttattr() { | |
mqttattr c; | |
c.qos = 0; | |
c.mode = 0; | |
c.verbose = 0; | |
c.count = 0; | |
c.tlen = 0; | |
c.topics = NULL; | |
c.flen = 0; | |
c.filters = NULL; | |
return(c); | |
} | |
int add_topic(mqttattr *mqtta, char *topic) { | |
if (mosquitto_sub_topic_check(topic) == MOSQ_ERR_SUCCESS) { | |
char **p; | |
p = (char**) realloc(mqtta->topics, (mqtta->tlen + 1) * sizeof(char*)); | |
if (p != NULL) { | |
mqtta->topics = p; | |
mqtta->topics[mqtta->tlen] = topic; | |
mqtta->tlen++; | |
return(mqtta->tlen); | |
} | |
} else { | |
fprintf(stderr, "Warning: '%s' is not a valid topic\n", topic); | |
} | |
return(0); | |
} | |
int add_filter(mqttattr *mqtta, char *filter) { | |
char **p; | |
p = (char**) realloc(mqtta->filters, (mqtta->flen + 1) * sizeof(char*)); | |
if (p != NULL) { | |
mqtta->filters = p; | |
mqtta->filters[mqtta->flen] = filter; | |
mqtta->flen++; | |
return(mqtta->flen); | |
} | |
return(0); | |
} | |
void destroy_mqttattr(mqttattr *mqtta) { | |
if (mqtta) { | |
if (mqtta->topics) free(mqtta->topics); | |
mqtta->tlen = 0; | |
if (mqtta->filters) free(mqtta->filters); | |
mqtta->flen = 0; | |
} | |
return; | |
} | |
char *now(char *ts) { | |
struct tm *timeinfo; | |
struct timeval tv; | |
gettimeofday(&tv, NULL); | |
timeinfo = localtime(&tv.tv_sec); | |
sprintf(ts, "%.4d%.2d%.2d%.2d%.2d%.2d.%.6d", timeinfo->tm_year+1900,timeinfo->tm_mon+1,timeinfo->tm_mday,timeinfo->tm_hour,timeinfo->tm_min,timeinfo->tm_sec,tv.tv_usec); | |
return(ts); | |
} | |
int regex_match(char *string, char *pattern) { | |
regex_t preg; | |
size_t nmatch = 1; | |
regmatch_t pmatch[nmatch]; | |
if (regcomp(&preg, pattern, REG_EXTENDED|REG_NEWLINE)) { | |
return(0); | |
} | |
if (regexec(&preg, string, nmatch, pmatch, 0) == REG_NOMATCH) { | |
regfree(&preg); | |
return(0); | |
} else { | |
regfree(&preg); | |
return(1); | |
} | |
} | |
void connect_callback(struct mosquitto *mosq, void *obj, int result) { | |
char timestamp[24]; | |
now(timestamp); | |
mqttattr *mqtta = obj; | |
if (!result) { | |
mosquitto_subscribe_multiple(mosq, NULL, mqtta->tlen, (char *const *const)mqtta->topics, mqtta->qos, 0, NULL); | |
if (mqtta->verbose) { | |
printf("[%s] MQTT broker connected.\n", timestamp); | |
int i; | |
for (i = 0; i < mqtta->tlen; i++) { | |
printf("[%s] topic '%s' subscribed.\n", timestamp, mqtta->topics[i]); | |
} | |
for (i = 0; i < mqtta->flen; i++) { | |
printf("[%s] topics matching with '%s' will be filtered out.\n", timestamp, mqtta->filters[i]); | |
} | |
} | |
} | |
return; | |
} | |
void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { | |
bool match = 0; | |
char timestamp[24]; | |
mqttattr *mqtta = obj; | |
int i; | |
static int topic_width = 0; | |
static int match_width = 0; | |
now(timestamp); | |
for (i = 0; i < mqtta->flen; i++) { | |
if (regex_match(message->topic, mqtta->filters[i])) return; | |
} | |
for (i = 0; i < mqtta->tlen; i++) { | |
mosquitto_topic_matches_sub(mqtta->topics[i], message->topic, &match); | |
if (match) break; | |
} | |
if (mqtta->count > 0) mqtta->count = mqtta->count - 1; | |
switch (mqtta->mode) { | |
case 1: | |
if (topic_width < strlen(message->topic)) topic_width = strlen(message->topic); | |
if (match_width < strlen(mqtta->topics[i])) match_width = strlen(mqtta->topics[i]); | |
printf("[%s] %-*s %-*s %.*s\n", timestamp, match_width, mqtta->topics[i], topic_width, message->topic, message->payloadlen, (char*)message->payload); | |
break; | |
case 2: | |
printf("[%s] %s %s %.*s\n", timestamp, mqtta->topics[i], message->topic, message->payloadlen, (char*)message->payload); | |
break; | |
case 3: | |
printf("[%s] %s %.*s\n", timestamp, message->topic, message->payloadlen, (char*)message->payload); | |
break; | |
default: | |
printf("%s %.*s\n", message->topic, message->payloadlen, (char*)message->payload); | |
break; | |
} | |
return; | |
} | |
int main(int argc, char **argv) { | |
char *mqtt_host = NULL; | |
int mqtt_port = 1883; | |
char cid[20]; | |
struct mosquitto *mosq; | |
int term = 0; | |
int rc = 0; | |
int i = 0; | |
mqttattr mqtta = create_mqttattr(); | |
while (i < argc) { | |
if ((!strcmp(argv[i], "-h")) && (i+1 < argc)) mqtt_host = argv[++i]; | |
if ((!strcmp(argv[i], "-p")) && (i+1 < argc)) mqtt_port = atoi(argv[++i]); | |
if ((!strcmp(argv[i], "-q")) && (i+1 < argc)) mqtta.qos = atoi(argv[++i]); | |
if ((!strcmp(argv[i], "-m")) && (i+1 < argc)) mqtta.mode = atoi(argv[++i]); | |
if ((!strcmp(argv[i], "-t")) && (i+1 < argc)) add_topic(&mqtta, argv[++i]); | |
if ((!strcmp(argv[i], "-f")) && (i+1 < argc)) add_filter(&mqtta, argv[++i]); | |
if ((!strcmp(argv[i], "-r")) && (i+1 < argc)) term = atoi(argv[++i]); | |
if ((!strcmp(argv[i], "-n")) && (i+1 < argc)) mqtta.count = atoi(argv[++i]); | |
if (!strcmp(argv[i], "-v")) mqtta.verbose = 1; | |
if (!strcmp(argv[i], "--help")) { | |
printf("MQTT subscription tool\nusage: %s -h <host> -p <port> -q <qos 0..2> -m <output mode 0..3> -t <topic 1> ... -t <topic n> -f <regex filter 1> ... -f <regex filter n> -r <runtime in seconds> -n <number of excepted messages> -v --help\n", basename(argv[0])); | |
return(0); | |
} | |
i++; | |
} | |
if (!mqtt_host) mqtt_host = "localhost"; | |
if (!mqtta.tlen) add_topic(&mqtta, "#"); | |
if ((mqtta.qos < 0) || (mqtta.qos > 2)) mqtta.qos = 0; | |
sprintf(cid, "mosqsub/%d", getpid()); | |
mosquitto_lib_init(); | |
mosq = mosquitto_new(cid, true, &mqtta); | |
if (mosq) { | |
mosquitto_connect_callback_set(mosq, connect_callback); | |
mosquitto_message_callback_set(mosq, message_callback); | |
if (mqtta.verbose) { | |
int major, minor, revision; | |
mosquitto_lib_version(&major, &minor, &revision); | |
printf("%s (libmosquitto %d.%d.%d)\n", basename(argv[0]), major, minor, revision); | |
printf("Connecting (%s) to %s:%d with qos=%d\n", cid, mqtt_host, mqtt_port, mqtta.qos); | |
} | |
rc = mosquitto_connect(mosq, mqtt_host, mqtt_port, 60); | |
if (!rc) { | |
if (term || mqtta.count) { | |
if (term && mqtta.verbose) printf("Looping for %d seconds\n", term); | |
if (mqtta.count && mqtta.verbose) printf("Looping until %d message(s) received\n", mqtta.count); | |
time_t start = time(NULL); | |
int c = !mqtta.count; | |
while((!term || (time(NULL) < start + term)) && (c || mqtta.count)) { | |
rc = mosquitto_loop(mosq, -1, 1); | |
if (rc) { | |
sleep(1); | |
mosquitto_reconnect(mosq); | |
} | |
} | |
} else { | |
if (mqtta.verbose) printf("Looping forever...\n"); | |
rc = mosquitto_loop_forever(mosq, -1, 1); | |
} | |
} else { | |
fprintf(stderr, "Error: Could not connect to '%s:%d'\n", mqtt_host, mqtt_port); | |
} | |
mosquitto_disconnect(mosq); | |
mosquitto_destroy(mosq); | |
} | |
mosquitto_lib_cleanup(); | |
destroy_mqttattr(&mqtta); | |
return(rc); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage: mqttsub -h -p -q <qos 0..2> -m <output mode 0..3> -t <topic 1> ... -t -f <regex filter 1> ... -f -r -n -v --help