@@ -117,7 +117,7 @@ STATIC void subscribed_cb(mqtt_obj_t *self, const char *topic)
117
117
tuple [0 ] = MP_OBJ_FROM_PTR (self );
118
118
tuple [1 ] = mp_obj_new_str (self -> name , strlen (self -> name ));
119
119
if (topic ) tuple [2 ] = mp_obj_new_str (topic , strlen (topic ));
120
- else tuple [2 ] = mp_obj_new_str ("?" , 2 );
120
+ else tuple [2 ] = mp_obj_new_str ("?? " , 2 );
121
121
122
122
mp_sched_schedule ((mp_obj_t )self -> mpy_subscribed_cb , mp_obj_new_tuple (3 , tuple ));
123
123
}
@@ -131,7 +131,7 @@ STATIC void unsubscribed_cb(mqtt_obj_t *self, const char *topic)
131
131
tuple [0 ] = MP_OBJ_FROM_PTR (self );
132
132
tuple [1 ] = mp_obj_new_str (self -> name , strlen (self -> name ));
133
133
if (topic ) tuple [2 ] = mp_obj_new_str (topic , strlen (topic ));
134
- else tuple [2 ] = mp_obj_new_str ("?" , 2 );
134
+ else tuple [2 ] = mp_obj_new_str ("?? " , 2 );
135
135
136
136
mp_sched_schedule ((mp_obj_t )self -> mpy_unsubscribed_cb , mp_obj_new_tuple (3 , tuple ));
137
137
}
@@ -141,13 +141,14 @@ STATIC void unsubscribed_cb(mqtt_obj_t *self, const char *topic)
141
141
STATIC void published_cb (mqtt_obj_t * self , const char * topic , int type )
142
142
{
143
143
if (self -> mpy_published_cb ) {
144
- mp_obj_t tuple [3 ];
144
+ mp_obj_t tuple [4 ];
145
145
tuple [0 ] = MP_OBJ_FROM_PTR (self );
146
146
tuple [1 ] = mp_obj_new_str (self -> name , strlen (self -> name ));
147
147
if (topic ) tuple [2 ] = mp_obj_new_str (topic , strlen (topic ));
148
- else tuple [2 ] = mp_obj_new_str ("?" , 2 );
148
+ else tuple [2 ] = mp_obj_new_str ("??" , 2 );
149
+ tuple [3 ] = mp_obj_new_int (type );
149
150
150
- mp_sched_schedule ((mp_obj_t )self -> mpy_published_cb , mp_obj_new_tuple (3 , tuple ));
151
+ mp_sched_schedule ((mp_obj_t )self -> mpy_published_cb , mp_obj_new_tuple (4 , tuple ));
151
152
}
152
153
}
153
154
@@ -317,15 +318,15 @@ STATIC void mqtt_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_
317
318
else if (self -> client -> config -> host ) server_uri = self -> client -> config -> host ;
318
319
319
320
mp_printf (print , "Mqtt[%s]\n (Server: %s:%u, Status: %s\n" , self -> name , server_uri , self -> client -> config -> port , sstat );
320
- if ((self -> client -> state == MQTT_STATE_CONNECTED ) || (self -> client -> state == MQTT_STATE_INIT )) {
321
+ // if ((self->client->state == MQTT_STATE_CONNECTED) || (self->client->state == MQTT_STATE_INIT)) {
321
322
mp_printf (print , " Client ID: %s, Clean session=%s, Keepalive=%ds\n LWT(" ,
322
323
self -> client -> connect_info .client_id , (self -> client -> connect_info .clean_session ? "True" : "False" ), self -> client -> connect_info .keepalive );
323
324
if (self -> client -> connect_info .will_topic ) {
324
325
mp_printf (print , "QoS=%d, Retain=%s, Topic='%s', Msg='%s')\n" ,
325
326
self -> client -> connect_info .will_qos , (self -> client -> connect_info .will_retain ? "True" : "False" ), self -> client -> connect_info .will_topic , self -> client -> connect_info .will_message );
326
327
}
327
328
else mp_printf (print , "not set)\n" );
328
- }
329
+ // }
329
330
/*
330
331
if ((self->client->settings->xMqttTask) && (self->client->settings->xMqttSendingTask)) {
331
332
mp_printf(print, " Used stack: %u/%u + %u/%u\n",
@@ -418,6 +419,9 @@ STATIC mp_obj_t mqtt_make_new(const mp_obj_type_t *type, size_t n_args, size_t n
418
419
// Populate settings
419
420
esp_mqtt_client_config_t mqtt_cfg = {0 };
420
421
422
+ // Set mqtt task priority
423
+ mqtt_cfg .task_prio = MICROPY_TASK_PRIORITY + 1 ;
424
+
421
425
// Event handle
422
426
mqtt_cfg .event_handle = mqtt_event_handler ;
423
427
@@ -584,9 +588,6 @@ STATIC mp_obj_t mqtt_op_config(mp_uint_t n_args, const mp_obj_t *pos_args, mp_ma
584
588
if (!self -> client ) {
585
589
mp_raise_ValueError ("Destroyed" );
586
590
}
587
- if (self -> client -> state < MQTT_STATE_INIT ) {
588
- mp_raise_ValueError ("Not initialized" );
589
- }
590
591
591
592
mp_arg_val_t args [MP_ARRAY_SIZE (mqtt_config_allowed_args )];
592
593
mp_arg_parse_all (n_args - 1 , pos_args + 1 , kw_args , MP_ARRAY_SIZE (mqtt_config_allowed_args ), mqtt_config_allowed_args , args );
@@ -709,7 +710,6 @@ STATIC mp_obj_t mqtt_op_subscribe(mp_uint_t n_args, const mp_obj_t *args)
709
710
if (checkClient (self ) != MQTT_STATE_CONNECTED ) return mp_const_false ;
710
711
711
712
const char * topic = mp_obj_str_get_str (args [1 ]);
712
- int wait = 2000 ;
713
713
int qos = 0 ;
714
714
if (n_args == 3 ) {
715
715
qos = mp_obj_get_int (args [2 ]);
@@ -726,15 +726,9 @@ STATIC mp_obj_t mqtt_op_subscribe(mp_uint_t n_args, const mp_obj_t *args)
726
726
self -> client -> config -> user_context = NULL ;
727
727
return mp_const_false ;
728
728
}
729
- while ((wait > 0 ) && (self -> subs_flag == 0 )) {
730
- vTaskDelay (10 / portTICK_PERIOD_MS );
731
- wait -= 10 ;
732
- }
733
- self -> client -> config -> user_context = NULL ;
734
- if (wait ) return mp_const_true ;
735
- else return mp_const_false ;
729
+ return mp_const_true ;
736
730
}
737
- MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN (mqtt_subscribe_obj , 2 , 3 , mqtt_op_subscribe );
731
+ MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN (mqtt_subscribe_obj , 2 , 4 , mqtt_op_subscribe );
738
732
739
733
//----------------------------------------------------------------------
740
734
STATIC mp_obj_t mqtt_op_unsubscribe (mp_obj_t self_in , mp_obj_t topic_in )
@@ -743,7 +737,6 @@ STATIC mp_obj_t mqtt_op_unsubscribe(mp_obj_t self_in, mp_obj_t topic_in)
743
737
if (checkClient (self ) != MQTT_STATE_CONNECTED ) return mp_const_false ;
744
738
745
739
const char * topic = mp_obj_str_get_str (topic_in );
746
- int wait = 2000 ;
747
740
self -> unsubs_flag = 0 ;
748
741
self -> client -> config -> user_context = (void * )topic ;
749
742
@@ -752,13 +745,7 @@ STATIC mp_obj_t mqtt_op_unsubscribe(mp_obj_t self_in, mp_obj_t topic_in)
752
745
self -> client -> config -> user_context = NULL ;
753
746
return mp_const_false ;
754
747
}
755
- while ((wait > 0 ) && (self -> unsubs_flag == 0 )) {
756
- vTaskDelay (10 / portTICK_PERIOD_MS );
757
- wait -= 10 ;
758
- }
759
- self -> client -> config -> user_context = NULL ;
760
- if (wait ) return mp_const_true ;
761
- else return mp_const_false ;
748
+ return mp_const_true ;
762
749
}
763
750
MP_DEFINE_CONST_FUN_OBJ_2 (mqtt_unsubscribe_obj , mqtt_op_unsubscribe );
764
751
@@ -772,15 +759,13 @@ STATIC mp_obj_t mqtt_op_publish(mp_uint_t n_args, const mp_obj_t *args)
772
759
const char * topic = mp_obj_str_get_str (args [1 ]);
773
760
const char * msg = mp_obj_str_get_data (args [2 ], & len );
774
761
775
- int wait = 2000 ;
776
762
int qos = 0 ;
777
763
if (n_args == 4 ) {
778
764
qos = mp_obj_get_int (args [3 ]);
779
765
if ((qos < 0 ) || (qos > 2 )) {
780
766
mp_raise_ValueError ("Wrong QoS value" );
781
767
}
782
768
}
783
- if (qos == 0 ) wait = 0 ;
784
769
785
770
int retain = 0 ;
786
771
if (n_args == 5 ) retain = mp_obj_is_true (args [4 ]);
@@ -793,12 +778,6 @@ STATIC mp_obj_t mqtt_op_publish(mp_uint_t n_args, const mp_obj_t *args)
793
778
self -> client -> config -> user_context = NULL ;
794
779
return mp_const_false ;
795
780
}
796
- while ((wait > 0 ) && (self -> publish_flag == 0 )) {
797
- vTaskDelay (10 / portTICK_PERIOD_MS );
798
- wait -= 10 ;
799
- }
800
- self -> client -> config -> user_context = NULL ;
801
-
802
781
return mp_const_true ;
803
782
}
804
783
MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN (mqtt_publish_obj , 3 , 5 , mqtt_op_publish );
@@ -845,12 +824,15 @@ STATIC mp_obj_t mqtt_op_stop(mp_obj_t self_in)
845
824
}
846
825
MP_DEFINE_CONST_FUN_OBJ_1 (mqtt_stop_obj , mqtt_op_stop );
847
826
848
- //--------------------------------------------
827
+ //---------------------------------------------
849
828
STATIC mp_obj_t mqtt_op_start (mp_obj_t self_in )
850
829
{
851
830
mqtt_obj_t * self = MP_OBJ_TO_PTR (self_in );
852
831
853
832
if ((self -> client ) && (self -> client -> state < MQTT_STATE_INIT )) {
833
+ if (self -> client -> connect_info .clean_session ) {
834
+ nlr_raise (mp_obj_new_exception_msg (& mp_type_TypeError , "Client not in persistent session, free and create again" ));
835
+ }
854
836
int res = esp_mqtt_client_start (self -> client );
855
837
if (res != 0 ) {
856
838
nlr_raise (mp_obj_new_exception_msg (& mp_type_TypeError , "Error starting client" ));
@@ -899,6 +881,15 @@ STATIC mp_obj_t mqtt_op_free(mp_obj_t self_in)
899
881
}
900
882
MP_DEFINE_CONST_FUN_OBJ_1 (mqtt_free_obj , mqtt_op_free );
901
883
884
+ //-----------------------------------------------------------
885
+ STATIC mp_obj_t mqtt_debug (mp_obj_t self_in , mp_obj_t enable )
886
+ {
887
+ //mqtt_obj_t *self = MP_OBJ_TO_PTR(self_in);
888
+ transport_debug = mp_obj_is_true (enable );
889
+ return mp_const_none ;
890
+ }
891
+ MP_DEFINE_CONST_FUN_OBJ_2 (mqtt_debug_obj , mqtt_debug );
892
+
902
893
903
894
//=========================================================
904
895
STATIC const mp_rom_map_elem_t mqtt_locals_dict_table [] = {
@@ -910,6 +901,7 @@ STATIC const mp_rom_map_elem_t mqtt_locals_dict_table[] = {
910
901
{ MP_ROM_QSTR (MP_QSTR_stop ), (mp_obj_t )& mqtt_stop_obj },
911
902
{ MP_ROM_QSTR (MP_QSTR_start ), (mp_obj_t )& mqtt_start_obj },
912
903
{ MP_ROM_QSTR (MP_QSTR_free ), (mp_obj_t )& mqtt_free_obj },
904
+ { MP_ROM_QSTR (MP_QSTR_debug ), (mp_obj_t )& mqtt_debug_obj },
913
905
};
914
906
STATIC MP_DEFINE_CONST_DICT (mqtt_locals_dict , mqtt_locals_dict_table );
915
907
0 commit comments