diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b25c15b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*~ diff --git a/LinkedList.h b/LinkedList.h index 5b519a8..3161337 100644 --- a/LinkedList.h +++ b/LinkedList.h @@ -55,7 +55,7 @@ class LinkedList { Node* ret = n->next; delete n; length -= 1; - return n->next; + return ret; } Node* last; @@ -63,4 +63,59 @@ class LinkedList { uint16_t length; }; + +template +class LinkedList2{ +public: + LinkedList2():last(NULL),first(NULL),length(0){} + + void Add(NodeType* item) { + if (last == NULL) { + last = item; + first = item; + item->next = NULL; + item->prev = NULL; + } else { + last->next = item; + item->prev = last; + item->next = NULL; + last = item; + } + length += 1; + } + + template + NodeType* Find(SearchParamType input, bool(*search_func)(const NodeType*, SearchParamType)) + { + NodeType* current = first; + while (current != NULL) { + if (search_func(current, input)) + return current; + current = current->next; + } + return NULL; + } + + NodeType* Remove(NodeType* n) { + if (!n) + return NULL; + if (first == n) + first = n->next; + if (last == n) + last = n->prev; + if (n->prev) + n->prev->next = n->next; + if (n->next) + n->next->prev = n->prev; + NodeType* ret = n->next; + delete n; + length -= 1; + return ret; + } + + NodeType* last; + NodeType* first; + uint16_t length; +}; + #endif diff --git a/MeshBase.cpp b/MeshBase.cpp index b14946f..90157e5 100644 --- a/MeshBase.cpp +++ b/MeshBase.cpp @@ -3,7 +3,7 @@ #include "MeshBase.h" #define MAX_PACKET_SIZE 32 -#define MAX_PAYLOAD_SIZE (MAX_PACKET_SIZE - sizeof(Message)) +#define MAX_PAYLOAD_SIZE (MAX_PACKET_SIZE - sizeof(MeshBase::MessageHeader)) // -- Broadcast addresses -- #define PEER_DISCOVERY 1 @@ -12,23 +12,24 @@ #define TO_BROADCAST(x) (0xBB00000000LL + x) #define TO_ADDRESS(x) (0xAA00000000LL + x) -#define PEER_DISCOVERY_TIME 3000 -#define PEER_CHECK_TIME 4000 -#define PEER_TIMEOUT 2 +#define PEER_DISCOVERY_TIME 4000 +#define CHECK_TIME 4000 +#define PEER_TIMEOUT 3 +#define ASSEMBLY_TIMEOUT 2 MeshBase::MeshBase(uint8_t ce, uint8_t cs) : radio(ce, cs) , address(0) , last_broadcast_time(0) -, last_peer_check_time(0) +, last_check_time(0) +, application_capabilities(0) {} void MeshBase::Begin() { radio.begin(); radio.enableDynamicPayloads(); - radio.setRetries(2,1); - //radio.openReadingPipe(0, TO_ADDRESS(address)); + radio.setRetries(4,2); radio.openReadingPipe(1, TO_BROADCAST(PEER_DISCOVERY)); radio.setAutoAck(0, true); radio.setAutoAck(1, false); @@ -46,8 +47,7 @@ void MeshBase::Update() } // Recieve - uint8_t pipe_num; - if (radio.available(&pipe_num)) + if (radio.available()) { bool done = false; do { @@ -58,51 +58,124 @@ void MeshBase::Update() } while (!done); } - // Update peers - if (millis() - last_peer_check_time > PEER_CHECK_TIME) + // Do periodic checks + if (millis() - last_check_time > CHECK_TIME) { - LinkedList::Node* current = peers.first; - while(current != NULL) + // Check for expired peers { - current->item->time += 1; - if (current->item->time >= PEER_TIMEOUT) + LinkedList::Node* current = peers.first; + while(current != NULL) { - Serial.print("Lost Peer: "); - Serial.println(current->item->address, DEC); - current = peers.Remove(current); - } else { - current = current->next; + current->item->time += 1; + if (current->item->time >= PEER_TIMEOUT) + { + Serial.print("Lost Peer: "); + Serial.println(current->item->address, DEC); + current = peers.Remove(current); + } else { + current = current->next; + } } } - last_peer_check_time = millis(); + + // Check for expired packets + { + Message* current = assembly_list.first; + while(current != NULL) + { + current->age += 1; + if (current->age >= ASSEMBLY_TIMEOUT) + { + Serial.print("Dropped partial message. address="); + Serial.print(current->header.address_from, DEC); + Serial.print(" msg_id="); + Serial.print(current->header.msg_id); + Serial.print(" blocks_recieved="); + Serial.println(current->blocks_recieved); + current = assembly_list.Remove(current); + } else { + current = current->next; + } + } + } + last_check_time = millis(); + } +} + +bool FindStream(const MeshBase::Message* current, const MeshBase::MessageHeader* find) +{ + if (current->header.address_from != find->address_from) + return false; + if (current->header.msg_id != find->msg_id) + return false; + return true; +} + +void MeshBase::Message::AddPart(const void* payload, uint8_t len, uint8_t part_num, bool more_parts) +{ + uint8_t start_pos = part_num * MAX_PAYLOAD_SIZE; + uint8_t end_pos = len + (part_num * MAX_PAYLOAD_SIZE); + if (data == NULL) + data = malloc(end_pos); + if (end_pos > data_used) + data = realloc(data, end_pos); + memcpy(&static_cast(data)[start_pos], payload, len); + if (end_pos > data_used) + data_used = end_pos; + blocks_recieved += 1; + if (!more_parts) { + header.split_more = false; + header.split_part = part_num; + } + age = 0; +} + +bool MeshBase::Message::IsDone() const +{ + // We set the split_more to false if we recieved the last packet + // in the stream, and split_part to total number of blocks in the stream. + // So if split_more is false, and we have the right number of blocks_recieved + // we are good to go. + if (!header.split_more && blocks_recieved > header.split_part) { + return true; } + return false; +} + +MeshBase::Message::~Message() { + free(data); } void MeshBase::HandlePacket(const byte* data, uint8_t len) { - if (len < sizeof(Message)) + if (len < sizeof(MessageHeader)) return; - const MeshBase::Message* msg = (struct MeshBase::Message*)data; - uint8_t payload_length = len - sizeof(Message); - const byte* payload = data + sizeof(Message); - if (msg->split_more || msg->split_part != 0) - { - // Re-assembly needed - // TODO: Re-assemble packets - } else { - switch(msg->type) { + const MeshBase::MessageHeader* header = (struct MeshBase::MessageHeader*)data; + uint8_t payload_length = len - sizeof(MessageHeader); + const byte* payload = data + sizeof(MessageHeader); + if (header->protocol_version != 1) + return; + + Message* s = assembly_list.Find(header, &FindStream); + if (s == NULL) { + s = new Message(*header); + assembly_list.Add(s); + } + s->AddPart(payload, payload_length, header->split_part, header->split_more); + if (s->IsDone()) { + switch(header->type) { case type_peer_discovery: - HandlePeerDiscovery(msg, payload, payload_length); + HandlePeerDiscovery(&(s->header), s->data, s->data_used); break; default: - OnMessage(msg, payload, payload_length); + OnMessage(&(s->header), s->data, s->data_used); break; } - delete data; + assembly_list.Remove(s); } } -void MeshBase::HandlePeerDiscovery(const MeshBase::Message* msg, const void* buff, uint8_t length) +void MeshBase::HandlePeerDiscovery(const MeshBase::MessageHeader* msg, const void* buff, uint8_t length) { if (length != sizeof(PeerDiscoveryMessage)) return; @@ -121,11 +194,12 @@ void MeshBase::HandlePeerDiscovery(const MeshBase::Message* msg, const void* buf Serial.print(" num_peers="); Serial.println(pd->num_peers, DEC); Peer* p = new Peer(msg->address_from); + p->Update(pd); peers.Add(p); OnNewPeer(p); } else { // Existing peer, reset timer - peer->time = 0; + peer->Update(pd); } } @@ -135,7 +209,7 @@ void MeshBase::SendPeerDiscovery() MeshBase::PeerDiscoveryMessage payload; payload.protocol_version = 1; payload.network_capabilities = 0; - payload.application_capabilities = 0; + payload.application_capabilities = application_capabilities; payload.num_peers = peers.length; payload.uptime = millis() / 1000; SendMessage(PEER_DISCOVERY, type_peer_discovery, &payload, sizeof(payload), true); @@ -143,12 +217,14 @@ void MeshBase::SendPeerDiscovery() void MeshBase::SendMessage(uint32_t to, uint8_t type, const void* data, uint8_t length, bool is_broadcast) { + static uint8_t current_msg_id = 0; byte buff[MAX_PACKET_SIZE]; - Message* msg = (struct Message*)buff; + MessageHeader* msg = (struct MessageHeader*)buff; msg->protocol_version = 1; msg->ttl = 0; msg->type = type; msg->address_from = address; + msg->msg_id = current_msg_id++; uint8_t num_pkts = (length / MAX_PAYLOAD_SIZE) + 1; for (uint8_t num = 0; num < num_pkts; ++num) @@ -156,14 +232,34 @@ void MeshBase::SendMessage(uint32_t to, uint8_t type, const void* data, uint8_t uint8_t remaining_length = length - (num * MAX_PAYLOAD_SIZE); msg->split_part = num; msg->split_more = remaining_length > MAX_PAYLOAD_SIZE; - memcpy(buff + sizeof(Message), (const byte*)data + (num * MAX_PAYLOAD_SIZE), min(remaining_length, MAX_PAYLOAD_SIZE)); + memcpy(buff + sizeof(MessageHeader), (const byte*)data + (num * MAX_PAYLOAD_SIZE), min(remaining_length, MAX_PAYLOAD_SIZE)); + uint8_t wire_size = min(remaining_length + sizeof(MessageHeader), MAX_PACKET_SIZE); radio.stopListening(); + bool result = true; if (is_broadcast) radio.openWritingPipe(TO_BROADCAST(to)); else radio.openWritingPipe(TO_ADDRESS(to)); - radio.write(buff, min(remaining_length, MAX_PAYLOAD_SIZE)); + if (is_broadcast) { + //radio.startWrite(buff, wire_size); + result = radio.write(buff, wire_size); + } else { + result = radio.write(buff, wire_size); + if (result == false) { + // Issue transmitting packet, retry? + radio.startListening(); + delay(100); + radio.stopListening(); + result = radio.write(buff, wire_size); + } + Serial.print(" T Sending pkt split_part="); + Serial.print(msg->split_part); + Serial.print(" id="); + Serial.print(msg->msg_id); + Serial.print(" result="); + Serial.println(result); + } radio.startListening(); } } @@ -197,3 +293,9 @@ MeshBase::Peer* MeshBase::GetPeer(uint32_t a) return NULL; } +void MeshBase::Peer::Update(const PeerDiscoveryMessage* msg) +{ + application_capabilities = msg->application_capabilities; + time = 0; +} + diff --git a/MeshBase.h b/MeshBase.h index 76a2f7e..2d59b91 100644 --- a/MeshBase.h +++ b/MeshBase.h @@ -7,68 +7,92 @@ #define PACKED __attribute__ ((packed)) +typedef uint32_t address_t; + class MeshBase { public: MeshBase(uint8_t ce, uint8_t cs); - struct Peer { - uint32_t address; - uint16_t time; - Peer(uint32_t address) : address(address), time(0) {} - }; - - struct Message - { + struct MessageHeader { uint8_t protocol_version : 4; uint8_t ttl : 4; uint8_t msg_id; bool split_more : 1; uint8_t split_part : 7; uint8_t type; - uint32_t address_from; + address_t address_from; } PACKED; + struct Message { + Message(const MessageHeader& a) : header(a), data(NULL), data_used(0), blocks_recieved(0), next(NULL), prev(NULL), age(0) {} + ~Message(); + MessageHeader header; + void* data; + uint8_t data_used; + uint8_t blocks_recieved; + Message* next; + Message* prev; + uint8_t age; + + void AddPart(const void* data, uint8_t len, uint8_t part_num, bool more_parts); + bool IsDone() const; + }; + // -- Message types -- - enum message_type { + enum MessageType { type_peer_discovery, type_peer_list, type_user, }; + enum ApplicationCapabilities { + capability_publish_events = 1 >> 0, + }; + void Begin(); void Update(); - void SendMessage(uint32_t address, uint8_t type, const void* data, uint8_t length); - uint32_t GetAddress() const { return address; } + void SendMessage(address_t address, uint8_t type, const void* data, uint8_t length); + void SendMessage(address_t address, uint8_t type, const void* data, uint8_t length, bool is_broadcast); + address_t GetAddress() const { return address; } bool IsReady() const { return address != 0; } protected: - virtual void OnMessage(const MeshBase::Message* meta, const void* data, uint8_t length) = 0; + struct PeerDiscoveryMessage + { + uint8_t protocol_version; + uint8_t network_capabilities; // What routing/networking can I do for the network + uint8_t application_capabilities; // What type of data do I expose + uint16_t num_peers; // Number of direct peers + uint32_t uptime; // Seconds since boot + } PACKED; + + struct Peer { + Peer(uint32_t address) : address(address), time(0), application_capabilities(0) {} + uint32_t address; + uint8_t time; + uint8_t application_capabilities; + void Update(const PeerDiscoveryMessage* msg); + }; + + virtual void OnMessage(const MessageHeader* meta, const void* data, uint8_t length) = 0; virtual void OnNewPeer(Peer*) {} virtual void OnLostPeer(Peer*) {} + uint8_t application_capabilities; private: uint32_t address; RF24 radio; unsigned long last_broadcast_time; - unsigned long last_peer_check_time; + unsigned long last_check_time; void SendPeerDiscovery(); - void SendMessage(uint32_t address, uint8_t type, const void* data, uint8_t length, bool is_broadcast); - void HandlePeerDiscovery(const Message* msg, const void* buff, uint8_t length); + void HandlePeerDiscovery(const MessageHeader* msg, const void* buff, uint8_t length); void HandlePacket(const byte* data, uint8_t length); void ChooseAddress(); - + LinkedList peers; - + LinkedList2 assembly_list; + Peer* GetPeer(uint32_t address); - - struct PeerDiscoveryMessage - { - uint8_t protocol_version; - uint8_t network_capabilities; // What routing/networking can I do for the network - uint8_t application_capabilities; // What type of data do I expose - uint16_t num_peers; // Number of direct peers - uint32_t uptime; // Seconds since boot - } PACKED; }; diff --git a/Publisher.h b/Publisher.h new file mode 100644 index 0000000..bbbd91f --- /dev/null +++ b/Publisher.h @@ -0,0 +1,48 @@ +#ifndef PUBLISHER_H +#define PUBLISHER_H + +#include +#include "MeshBase.h" + +class PublishApp : public MeshBase +{ +public: + PublishApp() : MeshBase(9, 10) + { + application_capabilities |= MeshBase::capability_publish_events; + } + + void OnEvent(uint8_t event_data) + { + const Target* current = targets.first; + while (current != NULL) + { + SendMessage(current->address, type_on_event, &event_data, sizeof(event_data)); + current = current->next; + } + } + + enum PublishMessageType { + type_on_event = MeshBase::type_user, + type_subscribe, + }; +protected: + virtual void OnMessage(const MeshBase::MessageHeader* meta, const void* data, uint8_t length) + { + if (meta->type == type_subscribe) + { + targets.Add(new Target(meta->address_from)); + } + } +private: + struct Target + { + Target(address_t target) : address(target), prev(NULL), next(NULL) {} + address_t address; + Target* prev; + Target* next; + }; + LinkedList2 targets; +}; + +#endif // PUBLISHER_H diff --git a/RF_test.ino b/RF_test.ino deleted file mode 100644 index 6c94368..0000000 --- a/RF_test.ino +++ /dev/null @@ -1,41 +0,0 @@ -#include -#include "RF24.h" -#include "MeshBase.h" - -class App : public MeshBase -{ -public: - App() : MeshBase(9, 10) {} -protected: - virtual void OnMessage(const MeshBase::Message* meta, const void* data, uint8_t length) - { - Serial.print(meta->address_from, DEC); - Serial.print(" : "); - Serial.println((const char*)data); - } - virtual void OnNewPeer(Peer* p) - { - if (!IsReady()) return; - char buff[255]; - int len = snprintf(buff, 255, "Hello %u", p->address); - Serial.print("Me : "); - Serial.println(buff); - SendMessage(p->address, type_user, buff, len + 1); - } -}; - -App app; - -void setup() -{ - Serial.begin(9600); - Serial.println("Starting RF_TEST"); - randomSeed(analogRead(0)); - app.Begin(); -} - -void loop() -{ - app.Update(); - delay(100); -} diff --git a/examples/Publisher/Publisher.ino b/examples/Publisher/Publisher.ino new file mode 100644 index 0000000..a022f4c --- /dev/null +++ b/examples/Publisher/Publisher.ino @@ -0,0 +1,33 @@ +#include +#include "RF24.h" +#include "MeshBase.h" +#include "Publisher.h" +#include "LinkedList.h" + +PublishApp app; + +unsigned long last_time; +uint8_t sequence; + +void setup() +{ + Serial.begin(19200); + Serial.println("Starting..."); + randomSeed(analogRead(0)); + app.Begin(); + last_time = millis(); + sequence = 0; +} + +void loop() +{ + app.Update(); + delay(100); + if (millis() - last_time > 10000) + { + app.OnEvent(sequence); + ++sequence; + last_time = millis(); + } +} + diff --git a/examples/Subscriber/Subscriber.ino b/examples/Subscriber/Subscriber.ino new file mode 100644 index 0000000..ec213ae --- /dev/null +++ b/examples/Subscriber/Subscriber.ino @@ -0,0 +1,64 @@ +#include +#include "RF24.h" +#include "MeshBase.h" +#include "Publisher.h" +#include "LinkedList.h" + +class SubscribeApp : public MeshBase +{ +public: + SubscribeApp() : MeshBase(9, 10) {} + + enum PublishMessageType { + type_on_event = MeshBase::type_user, + type_subscribe, + }; +protected: + virtual void OnMessage(const MeshBase::MessageHeader* meta, const void* data, uint8_t length) + { + if (meta->type == type_on_event) + { + if (length != sizeof(uint8_t)) + { + Serial.println("LENGTH WRONG"); + return; + } + OnEvent(meta->address_from, *(uint8_t*)data); + } + } + + virtual void OnNewPeer(Peer* p) + { + if (p->application_capabilities & MeshBase::capability_publish_events) + { + Serial.print("Found peer that has events! address="); + Serial.println(p->address); + SendMessage(p->address, type_subscribe, "", 1); + } + } + + virtual void OnEvent(address_t address, uint8_t event_data) + { + Serial.print("Event from "); + Serial.print(address); + Serial.print(" data: "); + Serial.println(event_data); + } +}; + +SubscribeApp app; + +void setup() +{ + Serial.begin(19200); + Serial.println("Starting..."); + randomSeed(analogRead(0)); + app.Begin(); +} + +void loop() +{ + app.Update(); + delay(100); +} +