Skip to content

Commit c479b8e

Browse files
committed
Adding packet re-assemlby
There appears to still be a bug in either splitting or re-assembly
1 parent 50638ea commit c479b8e

File tree

3 files changed

+116
-30
lines changed

3 files changed

+116
-30
lines changed

MeshBase.cpp

Lines changed: 89 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include "MeshBase.h"
44

55
#define MAX_PACKET_SIZE 32
6-
#define MAX_PAYLOAD_SIZE (MAX_PACKET_SIZE - sizeof(Message))
6+
#define MAX_PAYLOAD_SIZE (MAX_PACKET_SIZE - sizeof(MeshBase::MessageHeader))
77

88
// -- Broadcast addresses --
99
#define PEER_DISCOVERY 1
@@ -28,7 +28,6 @@ void MeshBase::Begin()
2828
radio.begin();
2929
radio.enableDynamicPayloads();
3030
radio.setRetries(2,1);
31-
//radio.openReadingPipe(0, TO_ADDRESS(address));
3231
radio.openReadingPipe(1, TO_BROADCAST(PEER_DISCOVERY));
3332
radio.setAutoAck(0, true);
3433
radio.setAutoAck(1, false);
@@ -46,8 +45,7 @@ void MeshBase::Update()
4645
}
4746

4847
// Recieve
49-
uint8_t pipe_num;
50-
if (radio.available(&pipe_num))
48+
if (radio.available())
5149
{
5250
bool done = false;
5351
do {
@@ -78,31 +76,93 @@ void MeshBase::Update()
7876
}
7977
}
8078

79+
bool FindStream(const MeshBase::Message* current, const MeshBase::MessageHeader* find)
80+
{
81+
if (current->header.address_from != find->address_from)
82+
return false;
83+
if (current->header.msg_id != find->msg_id)
84+
return false;
85+
return true;
86+
}
87+
88+
void MeshBase::Message::AddPart(const void* payload, uint8_t len, uint8_t part_num, bool more_parts)
89+
{
90+
uint8_t start_pos = part_num * MAX_PAYLOAD_SIZE;
91+
uint8_t end_pos = len + (part_num * MAX_PAYLOAD_SIZE);
92+
Serial.print(" R AddPart() : Adding part. start_pos=");
93+
Serial.print(start_pos);
94+
Serial.print(" end_pos=");
95+
Serial.print(end_pos);
96+
Serial.print(" len=");
97+
Serial.print(len);
98+
Serial.print(" part_num=");
99+
Serial.print(part_num);
100+
Serial.print(" more_parts=");
101+
Serial.println(more_parts);
102+
if (data == NULL)
103+
data = malloc(end_pos);
104+
if (end_pos > data_used)
105+
data = realloc(data, end_pos);
106+
memcpy(&static_cast<byte*>(data)[start_pos], payload, len);
107+
if (end_pos > data_used)
108+
data_used = end_pos;
109+
blocks_recieved += 1;
110+
if (!more_parts) {
111+
header.split_more = false;
112+
header.split_part = part_num;
113+
}
114+
}
115+
116+
bool MeshBase::Message::IsDone() const
117+
{
118+
// We set the split_more to false if we recieved the last packet
119+
// in the stream, and split_part to total number of blocks in the stream.
120+
// So if split_more is false, and we have the right number of blocks_recieved
121+
// we are good to go.
122+
Serial.print(" R IsDone() : split_more=");
123+
Serial.print(header.split_more);
124+
Serial.print(" split_part=");
125+
Serial.print(header.split_part);
126+
Serial.print(" blocks_recieved=");
127+
Serial.println(blocks_recieved);
128+
if (!header.split_more && blocks_recieved >= header.split_part)
129+
return true;
130+
Serial.println(" R IsDone() : False");
131+
return false;
132+
}
133+
134+
MeshBase::Message::~Message() {
135+
free(data);
136+
}
137+
81138
void MeshBase::HandlePacket(const byte* data, uint8_t len)
82139
{
83-
if (len < sizeof(Message))
140+
if (len < sizeof(MessageHeader))
84141
return;
85-
const MeshBase::Message* msg = (struct MeshBase::Message*)data;
86-
uint8_t payload_length = len - sizeof(Message);
87-
const byte* payload = data + sizeof(Message);
88-
if (msg->split_more || msg->split_part != 0)
89-
{
90-
// Re-assembly needed
91-
// TODO: Re-assemble packets
92-
} else {
93-
switch(msg->type) {
142+
const MeshBase::MessageHeader* header = (struct MeshBase::MessageHeader*)data;
143+
uint8_t payload_length = len - sizeof(MessageHeader);
144+
const byte* payload = data + sizeof(MessageHeader);
145+
Message* s = assembly_list.Find<const MessageHeader*>(header, &FindStream);
146+
if (s == NULL) {
147+
s = new Message(*header);
148+
assembly_list.Add(s);
149+
}
150+
s->AddPart(payload, payload_length, header->split_part, header->split_more);
151+
if (s->IsDone()) {
152+
Serial.println(" R IsDone() : true!!");
153+
switch(header->type) {
94154
case type_peer_discovery:
95-
HandlePeerDiscovery(msg, payload, payload_length);
155+
HandlePeerDiscovery(&(s->header), s->data, s->data_used);
96156
break;
97157
default:
98-
OnMessage(msg, payload, payload_length);
158+
OnMessage(&(s->header), s->data, s->data_used);
99159
break;
100160
}
101-
delete data;
161+
assembly_list.Remove(s);
102162
}
103163
}
104164

105-
void MeshBase::HandlePeerDiscovery(const MeshBase::Message* msg, const void* buff, uint8_t length)
165+
void MeshBase::HandlePeerDiscovery(const MeshBase::MessageHeader* msg, const void* buff, uint8_t length)
106166
{
107167
if (length != sizeof(PeerDiscoveryMessage))
108168
return;
@@ -143,28 +203,36 @@ void MeshBase::SendPeerDiscovery()
143203

144204
void MeshBase::SendMessage(uint32_t to, uint8_t type, const void* data, uint8_t length, bool is_broadcast)
145205
{
206+
static uint8_t current_msg_id = 0;
146207
byte buff[MAX_PACKET_SIZE];
147-
Message* msg = (struct Message*)buff;
208+
MessageHeader* msg = (struct MessageHeader*)buff;
148209
msg->protocol_version = 1;
149210
msg->ttl = 0;
150211
msg->type = type;
151212
msg->address_from = address;
213+
msg->msg_id = current_msg_id++;
152214

153215
uint8_t num_pkts = (length / MAX_PAYLOAD_SIZE) + 1;
154216
for (uint8_t num = 0; num < num_pkts; ++num)
155217
{
156218
uint8_t remaining_length = length - (num * MAX_PAYLOAD_SIZE);
157219
msg->split_part = num;
158220
msg->split_more = remaining_length > MAX_PAYLOAD_SIZE;
159-
memcpy(buff + sizeof(Message), (const byte*)data + (num * MAX_PAYLOAD_SIZE), min(remaining_length, MAX_PAYLOAD_SIZE));
221+
memcpy(buff + sizeof(MessageHeader), (const byte*)data + (num * MAX_PAYLOAD_SIZE), min(remaining_length, MAX_PAYLOAD_SIZE));
160222

161223
radio.stopListening();
162224
if (is_broadcast)
163225
radio.openWritingPipe(TO_BROADCAST(to));
164226
else
165227
radio.openWritingPipe(TO_ADDRESS(to));
166-
radio.write(buff, min(remaining_length, MAX_PAYLOAD_SIZE));
228+
radio.write(buff, min(remaining_length + sizeof(MessageHeader), MAX_PAYLOAD_SIZE));
167229
radio.startListening();
230+
Serial.print(" T Sending pkt split_part=");
231+
Serial.print(msg->split_part);
232+
Serial.print(" split_more=");
233+
Serial.print(msg->split_more);
234+
Serial.print(" length=");
235+
Serial.println(min(remaining_length, MAX_PAYLOAD_SIZE));
168236
}
169237
}
170238

MeshBase.h

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ class MeshBase
1717
uint16_t time;
1818
Peer(uint32_t address) : address(address), time(0) {}
1919
};
20-
21-
struct Message
20+
21+
struct MessageHeader
2222
{
2323
uint8_t protocol_version : 4;
2424
uint8_t ttl : 4;
@@ -29,6 +29,20 @@ class MeshBase
2929
uint32_t address_from;
3030
} PACKED;
3131

32+
struct Message {
33+
Message(const MessageHeader& a) : header(a), data(NULL), data_used(0), blocks_recieved(0), next(0), prev(0) {}
34+
~Message();
35+
MessageHeader header;
36+
void* data;
37+
uint8_t data_used;
38+
uint8_t blocks_recieved;
39+
Message* next;
40+
Message* prev;
41+
42+
void AddPart(const void* data, uint8_t len, uint8_t part_num, bool more_parts);
43+
bool IsDone() const;
44+
};
45+
3246
// -- Message types --
3347
enum message_type {
3448
type_peer_discovery,
@@ -42,7 +56,7 @@ class MeshBase
4256
uint32_t GetAddress() const { return address; }
4357
bool IsReady() const { return address != 0; }
4458
protected:
45-
virtual void OnMessage(const MeshBase::Message* meta, const void* data, uint8_t length) = 0;
59+
virtual void OnMessage(const MessageHeader* meta, const void* data, uint8_t length) = 0;
4660
virtual void OnNewPeer(Peer*) {}
4761
virtual void OnLostPeer(Peer*) {}
4862
private:
@@ -53,14 +67,15 @@ class MeshBase
5367

5468
void SendPeerDiscovery();
5569
void SendMessage(uint32_t address, uint8_t type, const void* data, uint8_t length, bool is_broadcast);
56-
void HandlePeerDiscovery(const Message* msg, const void* buff, uint8_t length);
70+
void HandlePeerDiscovery(const MessageHeader* msg, const void* buff, uint8_t length);
5771
void HandlePacket(const byte* data, uint8_t length);
5872
void ChooseAddress();
59-
73+
6074
LinkedList<Peer> peers;
61-
75+
LinkedList2<Message> assembly_list;
76+
6277
Peer* GetPeer(uint32_t address);
63-
78+
6479
struct PeerDiscoveryMessage
6580
{
6681
uint8_t protocol_version;

RF_test.ino

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,19 @@ class App : public MeshBase
77
public:
88
App() : MeshBase(9, 10) {}
99
protected:
10-
virtual void OnMessage(const MeshBase::Message* meta, const void* data, uint8_t length)
10+
virtual void OnMessage(const MeshBase::MessageHeader* meta, const void* data, uint8_t length)
1111
{
1212
Serial.print(meta->address_from, DEC);
1313
Serial.print(" : ");
1414
Serial.println((const char*)data);
15+
Serial.print("split_part = ");
16+
Serial.println(meta->split_part, DEC);
1517
}
1618
virtual void OnNewPeer(Peer* p)
1719
{
1820
if (!IsReady()) return;
1921
char buff[255];
20-
int len = snprintf(buff, 255, "Hello %u", p->address);
22+
int len = snprintf(buff, 255, "Hello %u. This is a super long message for some reason. Please keep adding to this message for great good and other things. I would very much like it if this could be put together.", p->address);
2123
Serial.print("Me : ");
2224
Serial.println(buff);
2325
SendMessage(p->address, type_user, buff, len + 1);
@@ -39,3 +41,4 @@ void loop()
3941
app.Update();
4042
delay(100);
4143
}
44+

0 commit comments

Comments
 (0)