Subversion Repositories Projects

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
1702 - 1
#ifndef MAVLINKPROTOBUFMANAGER_HPP
2
#define MAVLINKPROTOBUFMANAGER_HPP
3
 
4
#include <deque>
5
#include <google/protobuf/message.h>
6
#include <iostream>
7
#include <tr1/memory>
8
 
9
#include <checksum.h>
10
#include <common/mavlink.h>
11
#include <mavlink_types.h>
12
#include <pixhawk/pixhawk.pb.h>
13
 
14
namespace mavlink
15
{
16
 
17
class ProtobufManager
18
{
19
public:
20
        ProtobufManager()
21
         : mRegisteredTypeCount(0)
22
         , mStreamID(0)
23
         , mVerbose(false)
24
         , kExtendedHeaderSize(MAVLINK_EXTENDED_HEADER_LEN)
25
         , kExtendedPayloadMaxSize(MAVLINK_MAX_EXTENDED_PAYLOAD_LEN)
26
        {
27
                // register GLOverlay
28
                {
29
                        std::tr1::shared_ptr<px::GLOverlay> msg(new px::GLOverlay);
30
                        registerType(msg);
31
                }
32
 
33
                // register ObstacleList
34
                {
35
                        std::tr1::shared_ptr<px::ObstacleList> msg(new px::ObstacleList);
36
                        registerType(msg);
37
                }
38
 
39
                // register ObstacleMap
40
                {
41
                        std::tr1::shared_ptr<px::ObstacleMap> msg(new px::ObstacleMap);
42
                        registerType(msg);
43
                }
44
 
45
                // register Path
46
                {
47
                        std::tr1::shared_ptr<px::Path> msg(new px::Path);
48
                        registerType(msg);
49
                }
50
 
51
                // register PointCloudXYZI
52
                {
53
                        std::tr1::shared_ptr<px::PointCloudXYZI> msg(new px::PointCloudXYZI);
54
                        registerType(msg);
55
                }
56
 
57
                // register PointCloudXYZRGB
58
                {
59
                        std::tr1::shared_ptr<px::PointCloudXYZRGB> msg(new px::PointCloudXYZRGB);
60
                        registerType(msg);
61
                }
62
 
63
                // register RGBDImage
64
                {
65
                        std::tr1::shared_ptr<px::RGBDImage> msg(new px::RGBDImage);
66
                        registerType(msg);
67
                }
68
 
69
                srand(time(NULL));
70
                mStreamID = rand() + 1;
71
        }
72
 
73
        bool fragmentMessage(uint8_t system_id, uint8_t component_id,
74
                                                 uint8_t target_system, uint8_t target_component,
75
                                                 const google::protobuf::Message& protobuf_msg,
76
                                                 std::vector<mavlink_extended_message_t>& fragments) const
77
        {
78
                TypeMap::const_iterator it = mTypeMap.find(protobuf_msg.GetTypeName());
79
                if (it == mTypeMap.end())
80
                {
81
                        std::cout << "# WARNING: Protobuf message with type "
82
                                          << protobuf_msg.GetTypeName() << " is not registered."
83
                                          << std::endl;
84
                        return false;
85
                }
86
 
87
                uint8_t typecode = it->second;
88
 
89
                std::string data = protobuf_msg.SerializeAsString();
90
 
91
                int fragmentCount = (protobuf_msg.ByteSize() + kExtendedPayloadMaxSize - 1) / kExtendedPayloadMaxSize;
92
                unsigned int offset = 0;
93
 
94
                for (int i = 0; i < fragmentCount; ++i)
95
                {
96
                        mavlink_extended_message_t fragment;                   
97
 
98
                        // write extended header data
99
                        uint8_t* payload = reinterpret_cast<uint8_t*>(fragment.base_msg.payload64);
100
                        unsigned int length = 0;
101
                        uint8_t flags = 0;
102
 
103
                        if (i < fragmentCount - 1)
104
                        {
105
                                length = kExtendedPayloadMaxSize;
106
                                flags |= 0x1;
107
                        }
108
                        else
109
                        {
110
                                length = protobuf_msg.ByteSize() - kExtendedPayloadMaxSize * (fragmentCount - 1);
111
                        }
112
 
113
                        memcpy(payload, &target_system, 1);
114
                        memcpy(payload + 1, &target_component, 1);
115
                        memcpy(payload + 2, &typecode, 1);
116
                        memcpy(payload + 3, &length, 4);
117
                        memcpy(payload + 7, &mStreamID, 2);
118
                        memcpy(payload + 9, &offset, 4);
119
                        memcpy(payload + 13, &flags, 1);
120
 
121
                        fragment.base_msg.msgid = MAVLINK_MSG_ID_EXTENDED_MESSAGE;
122
                        mavlink_finalize_message(&fragment.base_msg, system_id, component_id, kExtendedHeaderSize, 0);
123
 
124
                        // write extended payload data
125
                        fragment.extended_payload_len = length;
126
                        memcpy(fragment.extended_payload, &data[offset], length);
127
 
128
                        fragments.push_back(fragment);
129
                        offset += length;
130
                }
131
 
132
                if (mVerbose)
133
                {
134
                        std::cerr << "# INFO: Split extended message with size "
135
                                          << protobuf_msg.ByteSize() << " into "
136
                                          << fragmentCount << " fragments." << std::endl;
137
                }
138
 
139
                return true;
140
        }
141
 
142
        bool cacheFragment(mavlink_extended_message_t& msg)
143
        {
144
                if (!validFragment(msg))
145
                {
146
                        if (mVerbose)
147
                        {
148
                                std::cerr << "# WARNING: Message is not a valid fragment. "
149
                                                  << "Dropping message..." << std::endl;
150
                        }
151
                        return false;
152
                }
153
 
154
                // read extended header
155
                uint8_t* payload = reinterpret_cast<uint8_t*>(msg.base_msg.payload64);
156
                uint8_t typecode = 0;
157
                unsigned int length = 0;
158
                unsigned short streamID = 0;
159
                unsigned int offset = 0;
160
                uint8_t flags = 0;
161
 
162
                memcpy(&typecode, payload + 2, 1);
163
                memcpy(&length, payload + 3, 4);
164
                memcpy(&streamID, payload + 7, 2);
165
                memcpy(&offset, payload + 9, 4);
166
                memcpy(&flags, payload + 13, 1);
167
 
168
                if (typecode >= mTypeMap.size())
169
                {
170
                        std::cout << "# WARNING: Protobuf message with type code "
171
                                          << static_cast<int>(typecode) << " is not registered." << std::endl;
172
                        return false;
173
                }
174
 
175
                bool reassemble = false;
176
 
177
                FragmentQueue::iterator it = mFragmentQueue.find(streamID);
178
                if (it == mFragmentQueue.end())
179
                {
180
                        if (offset == 0)
181
                        {
182
                                mFragmentQueue[streamID].push_back(msg);
183
 
184
                                if ((flags & 0x1) != 0x1)
185
                                {
186
                                        reassemble = true;
187
                                }
188
 
189
                                if (mVerbose)
190
                                {
191
                                        std::cerr << "# INFO: Added fragment to new queue."
192
                                                          << std::endl;
193
                                }
194
                        }
195
                        else
196
                        {
197
                                if (mVerbose)
198
                                {
199
                                        std::cerr << "# WARNING: Message is not a valid fragment. "
200
                                                          << "Dropping message..." << std::endl;
201
                                }
202
                        }
203
                }
204
                else
205
                {
206
                        std::deque<mavlink_extended_message_t>& queue = it->second;
207
 
208
                        if (queue.empty())
209
                        {
210
                                if (offset == 0)
211
                                {
212
                                        queue.push_back(msg);
213
 
214
                                        if ((flags & 0x1) != 0x1)
215
                                        {
216
                                                reassemble = true;
217
                                        }
218
                                }
219
                                else
220
                                {
221
                                        if (mVerbose)
222
                                        {
223
                                                std::cerr << "# WARNING: Message is not a valid fragment. "
224
                                                                  << "Dropping message..." << std::endl;
225
                                        }
226
                                }
227
                        }
228
                        else
229
                        {
230
                                if (fragmentDataSize(queue.back()) + fragmentOffset(queue.back()) != offset)
231
                                {
232
                                        if (mVerbose)
233
                                        {
234
                                                std::cerr << "# WARNING: Previous fragment(s) have been lost. "
235
                                                                  << "Dropping message and clearing queue..." << std::endl;
236
                                        }
237
                                        queue.clear();
238
                                }
239
                                else
240
                                {
241
                                        queue.push_back(msg);
242
 
243
                                        if ((flags & 0x1) != 0x1)
244
                                        {
245
                                                reassemble = true;
246
                                        }
247
                                }
248
                        }
249
                }
250
 
251
                if (reassemble)
252
                {
253
                        std::deque<mavlink_extended_message_t>& queue = mFragmentQueue[streamID];
254
 
255
                        std::string data;
256
                        for (size_t i = 0; i < queue.size(); ++i)
257
                        {
258
                                mavlink_extended_message_t& mavlink_msg = queue.at(i);
259
 
260
                                data.append(reinterpret_cast<char*>(&mavlink_msg.extended_payload[0]),
261
                                                        static_cast<size_t>(mavlink_msg.extended_payload_len));
262
                        }
263
 
264
                        mMessages.at(typecode)->ParseFromString(data);
265
 
266
                        mMessageAvailable.at(typecode) = true;
267
 
268
                        queue.clear();
269
 
270
                        if (mVerbose)
271
                        {
272
                                std::cerr << "# INFO: Reassembled fragments for message with typename "
273
                                                  << mMessages.at(typecode)->GetTypeName() << " and size "
274
                                                  << mMessages.at(typecode)->ByteSize()
275
                                                  << "." << std::endl;
276
                        }
277
                }
278
 
279
                return true;
280
        }
281
 
282
        bool getMessage(std::tr1::shared_ptr<google::protobuf::Message>& msg)
283
        {
284
                for (size_t i = 0; i < mMessageAvailable.size(); ++i)
285
                {
286
                        if (mMessageAvailable.at(i))
287
                        {
288
                                msg = mMessages.at(i);
289
                                mMessageAvailable.at(i) = false;
290
 
291
                                return true;
292
                        }
293
                }
294
 
295
                return false;
296
        }
297
 
298
private:
299
        void registerType(const std::tr1::shared_ptr<google::protobuf::Message>& msg)
300
        {
301
                mTypeMap[msg->GetTypeName()] = mRegisteredTypeCount;
302
                ++mRegisteredTypeCount;
303
                mMessages.push_back(msg);
304
                mMessageAvailable.push_back(false);
305
        }
306
 
307
        bool validFragment(const mavlink_extended_message_t& msg) const
308
        {
309
                if (msg.base_msg.magic != MAVLINK_STX ||
310
                        msg.base_msg.len != kExtendedHeaderSize ||
311
                        msg.base_msg.msgid != MAVLINK_MSG_ID_EXTENDED_MESSAGE)
312
                {
313
                        return false;
314
                }
315
 
316
                uint16_t checksum;
317
                checksum = crc_calculate(reinterpret_cast<const uint8_t*>(&msg.base_msg.len), MAVLINK_CORE_HEADER_LEN);
318
                crc_accumulate_buffer(&checksum, reinterpret_cast<const char*>(&msg.base_msg.payload64), kExtendedHeaderSize);
319
#if MAVLINK_CRC_EXTRA
320
                static const uint8_t mavlink_message_crcs[256] = MAVLINK_MESSAGE_CRCS;
321
                crc_accumulate(mavlink_message_crcs[msg.base_msg.msgid], &checksum);
322
#endif
323
 
324
                if (mavlink_ck_a(&(msg.base_msg)) != (uint8_t)(checksum & 0xFF) &&
325
                    mavlink_ck_b(&(msg.base_msg)) != (uint8_t)(checksum >> 8))
326
                {
327
                        return false;
328
                }
329
 
330
                return true;
331
        }
332
 
333
        unsigned int fragmentDataSize(const mavlink_extended_message_t& msg) const
334
        {
335
                const uint8_t* payload = reinterpret_cast<const uint8_t*>(msg.base_msg.payload64);
336
 
337
                return *(reinterpret_cast<const unsigned int*>(payload + 3));
338
        }
339
 
340
        unsigned int fragmentOffset(const mavlink_extended_message_t& msg) const
341
        {
342
                const uint8_t* payload = reinterpret_cast<const uint8_t*>(msg.base_msg.payload64);
343
 
344
                return *(reinterpret_cast<const unsigned int*>(payload + 9));
345
        }
346
 
347
        int mRegisteredTypeCount;
348
        unsigned short mStreamID;
349
        bool mVerbose;
350
 
351
        typedef std::map<std::string, uint8_t> TypeMap;
352
        TypeMap mTypeMap;
353
        std::vector< std::tr1::shared_ptr<google::protobuf::Message> > mMessages;
354
        std::vector<bool> mMessageAvailable;
355
 
356
        typedef std::map<unsigned short, std::deque<mavlink_extended_message_t> > FragmentQueue;
357
        FragmentQueue mFragmentQueue;
358
 
359
        const int kExtendedHeaderSize;
360
        /**
361
         * Extended header structure
362
         * =========================
363
         *   byte 0 - target_system
364
         *   byte 1 - target_component
365
         *   byte 2 - extended message id (type code)
366
         *   bytes 3-6 - extended payload size in bytes
367
         *   byte 7-8 - stream ID
368
         *   byte 9-12 - fragment offset
369
         *   byte 13 - fragment flags (bit 0 - 1=more fragments, 0=last fragment)
370
         */
371
 
372
        const int kExtendedPayloadMaxSize;
373
};
374
 
375
}
376
 
377
#endif