48 #include "lib/random.h"
49 #include "sys/node-id.h"
64 #define PRINTF(...) printf(__VA_ARGS__)
70 static struct broadcast_conn deluge_broadcast;
71 static struct unicast_conn deluge_uc;
72 static struct deluge_object current_object;
73 static process_event_t deluge_event;
76 static int deluge_state;
77 static int old_summary;
78 static int neighbor_inconsistency;
79 static unsigned r_interval;
80 static unsigned recv_adv;
81 static int broadcast_profile;
84 static struct ctimer rx_timer;
85 static struct ctimer tx_timer;
86 static struct ctimer summary_timer;
87 static struct ctimer profile_timer;
91 static deluge_object_id_t next_object_id;
94 static void broadcast_recv(
struct broadcast_conn *,
const linkaddr_t *);
95 static void unicast_recv(
struct unicast_conn *,
const linkaddr_t *);
98 static const struct unicast_callbacks unicast_call = {unicast_recv,
NULL};
101 PROCESS(deluge_process,
"Deluge");
104 transition(
int state)
106 if(state != deluge_state) {
107 switch(deluge_state) {
108 case DELUGE_STATE_MAINTAIN:
112 case DELUGE_STATE_RX:
115 case DELUGE_STATE_TX:
119 deluge_state = state;
124 write_page(
struct deluge_object *obj,
unsigned pagenum,
unsigned char *data)
128 offset = pagenum * S_PAGE;
133 return cfs_write(obj->cfs_fd, (
char *)data, S_PAGE);
137 read_page(
struct deluge_object *obj,
unsigned pagenum,
unsigned char *buf)
141 offset = pagenum * S_PAGE;
146 return cfs_read(obj->cfs_fd, (
char *)buf, S_PAGE);
150 init_page(
struct deluge_object *obj,
int pagenum,
int have)
152 struct deluge_page *page;
153 unsigned char buf[S_PAGE];
155 page = &obj->pages[pagenum];
158 page->last_request = 0;
162 page->version = obj->version;
163 page->packet_set = ALL_PACKETS;
164 page->flags |= PAGE_COMPLETE;
165 read_page(obj, pagenum, buf);
169 page->packet_set = 0;
174 file_size(
const char *file)
181 return (cfs_offset_t)-1;
191 init_object(
struct deluge_object *obj,
char *filename,
unsigned version)
193 static struct deluge_page *page;
197 if(obj->cfs_fd < 0) {
201 obj->filename = filename;
202 obj->object_id = next_object_id++;
203 obj->size = file_size(filename);
204 obj->version = obj->update_version = version;
205 obj->current_rx_page = 0;
209 obj->pages = malloc(OBJECT_PAGE_COUNT(*obj) *
sizeof(*obj->pages));
210 if(obj->pages ==
NULL) {
215 for(i = 0; i < OBJECT_PAGE_COUNT(current_object); i++) {
216 page = ¤t_object.pages[i];
217 init_page(¤t_object, i, 1);
220 memset(obj->current_page, 0,
sizeof(obj->current_page));
226 highest_available_page(
struct deluge_object *obj)
230 for(i = 0; i < OBJECT_PAGE_COUNT(*obj); i++) {
231 if(!(obj->pages[i].flags & PAGE_COMPLETE)) {
240 send_request(
void *arg)
242 struct deluge_object *obj;
243 struct deluge_msg_request request;
245 obj = (
struct deluge_object *)arg;
247 request.cmd = DELUGE_CMD_REQUEST;
248 request.pagenum = obj->current_rx_page;
249 request.version = obj->pages[request.pagenum].version;
250 request.request_set = ~obj->pages[obj->current_rx_page].packet_set;
251 request.object_id = obj->object_id;
253 PRINTF(
"Sending request for page %d, version %u, request_set %u\n",
254 request.pagenum, request.version, request.request_set);
256 unicast_send(&deluge_uc, &obj->summary_from);
259 if(++obj->nrequests == CONST_LAMBDA) {
262 transition(DELUGE_STATE_MAINTAIN);
269 advertise_summary(
struct deluge_object *obj)
271 struct deluge_msg_summary summary;
273 if(recv_adv >= CONST_K) {
278 summary.cmd = DELUGE_CMD_SUMMARY;
279 summary.version = obj->update_version;
280 summary.highest_available = highest_available_page(obj);
281 summary.object_id = obj->object_id;
283 PRINTF(
"Advertising summary for object id %u: version=%u, available=%u\n",
284 (
unsigned)obj->object_id, summary.version, summary.highest_available);
291 handle_summary(
struct deluge_msg_summary *msg,
const linkaddr_t *sender)
293 int highest_available, i;
294 clock_time_t oldest_request, oldest_data, now;
295 struct deluge_page *page;
297 highest_available = highest_available_page(¤t_object);
299 if(msg->version != current_object.version ||
300 msg->highest_available != highest_available) {
301 neighbor_inconsistency = 1;
306 if(msg->version < current_object.version) {
308 broadcast_profile = 1;
312 if(msg->version == current_object.update_version &&
313 msg->highest_available > highest_available) {
314 if(msg->highest_available > OBJECT_PAGE_COUNT(current_object)) {
315 PRINTF(
"Error: highest available is above object page count!\n");
319 oldest_request = oldest_data = now =
clock_time();
320 for(i = 0; i < msg->highest_available; i++) {
321 page = ¤t_object.pages[i];
322 if(page->last_request < oldest_request) {
323 oldest_request = page->last_request;
325 if(page->last_request < oldest_data) {
326 oldest_data = page->last_data;
330 if(((now - oldest_request) /
CLOCK_SECOND) <= 2 * r_interval ||
336 transition(DELUGE_STATE_RX);
340 CONST_OMEGA * ESTIMATED_TX_TIME + ((
unsigned)
random_rand() % T_R),
341 send_request, ¤t_object);
347 send_page(
struct deluge_object *obj,
unsigned pagenum)
349 unsigned char buf[S_PAGE];
350 struct deluge_msg_packet pkt;
353 pkt.cmd = DELUGE_CMD_PACKET;
354 pkt.pagenum = pagenum;
355 pkt.version = obj->pages[pagenum].version;
357 pkt.object_id = obj->object_id;
360 read_page(obj, pagenum, buf);
363 for(cp = buf; cp + S_PKT <= (
unsigned char *)&buf[S_PAGE]; cp += S_PKT) {
364 if(obj->tx_set & (1 << pkt.packetnum)) {
366 memcpy(pkt.payload, cp, S_PKT);
376 tx_callback(
void *arg)
378 struct deluge_object *obj;
380 obj = (
struct deluge_object *)arg;
381 if(obj->current_tx_page >= 0 && obj->tx_set) {
382 send_page(obj, obj->current_tx_page);
385 packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE,
386 PACKETBUF_ATTR_PACKET_TYPE_STREAM);
389 packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE,
390 PACKETBUF_ATTR_PACKET_TYPE_STREAM_END);
391 obj->current_tx_page = -1;
392 transition(DELUGE_STATE_MAINTAIN);
398 handle_request(
struct deluge_msg_request *msg)
400 int highest_available;
402 if(msg->pagenum >= OBJECT_PAGE_COUNT(current_object)) {
406 if(msg->version != current_object.version) {
407 neighbor_inconsistency = 1;
410 highest_available = highest_available_page(¤t_object);
413 if(msg->version == current_object.version &&
414 msg->pagenum <= highest_available) {
415 current_object.pages[msg->pagenum].last_request =
clock_time();
418 if(msg->pagenum == current_object.current_tx_page) {
419 current_object.tx_set |= msg->request_set;
421 current_object.current_tx_page = msg->pagenum;
422 current_object.tx_set = msg->request_set;
425 transition(DELUGE_STATE_TX);
431 handle_packet(
struct deluge_msg_packet *msg)
433 struct deluge_page *page;
435 struct deluge_msg_packet packet;
437 memcpy(&packet, msg,
sizeof(packet));
439 PRINTF(
"Incoming packet for object id %u, version %u, page %u, packet num %u!\n",
440 (
unsigned)packet.object_id, (
unsigned)packet.version,
441 (
unsigned)packet.pagenum, (
unsigned)packet.packetnum);
443 if(packet.pagenum != current_object.current_rx_page) {
447 if(packet.version != current_object.version) {
448 neighbor_inconsistency = 1;
451 page = ¤t_object.pages[packet.pagenum];
452 if(packet.version == page->version && !(page->flags & PAGE_COMPLETE)) {
453 memcpy(¤t_object.current_page[S_PKT * packet.packetnum],
454 packet.payload, S_PKT);
457 if(packet.crc != crc) {
458 PRINTF(
"packet crc: %hu, calculated crc: %hu\n", packet.crc, crc);
463 page->packet_set |= (1 << packet.packetnum);
465 if(page->packet_set == ALL_PACKETS) {
467 packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE,
468 PACKETBUF_ATTR_PACKET_TYPE_STREAM_END);
470 write_page(¤t_object, packet.pagenum, current_object.current_page);
471 page->version = packet.version;
472 page->flags = PAGE_COMPLETE;
473 PRINTF(
"Page %u completed\n", packet.pagenum);
475 current_object.current_rx_page++;
477 if(packet.pagenum == OBJECT_PAGE_COUNT(current_object) - 1) {
478 current_object.version = current_object.update_version;
480 PRINTF(
"Update completed for object %u, version %u\n",
481 (
unsigned)current_object.object_id, packet.version);
482 }
else if(current_object.current_rx_page < OBJECT_PAGE_COUNT(current_object)) {
485 CONST_OMEGA * ESTIMATED_TX_TIME + (
random_rand() % T_R),
486 send_request, ¤t_object);
490 transition(DELUGE_STATE_MAINTAIN);
493 packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE,
494 PACKETBUF_ATTR_PACKET_TYPE_STREAM);
500 send_profile(
struct deluge_object *obj)
502 struct deluge_msg_profile *msg;
503 unsigned char buf[
sizeof(*msg) + OBJECT_PAGE_COUNT(*obj)];
506 if(broadcast_profile && recv_adv < CONST_K) {
507 broadcast_profile = 0;
509 msg = (
struct deluge_msg_profile *)buf;
510 msg->cmd = DELUGE_CMD_PROFILE;
511 msg->version = obj->version;
512 msg->npages = OBJECT_PAGE_COUNT(*obj);
513 msg->object_id = obj->object_id;
514 for(i = 0; i < msg->npages; i++) {
515 msg->version_vector[i] = obj->pages[i].version;
524 handle_profile(
struct deluge_msg_profile *msg)
528 struct deluge_object *obj;
531 obj = ¤t_object;
532 if(msg->version <= current_object.update_version) {
536 PRINTF(
"Received profile of version %u with a vector of %u pages.\n",
537 msg->version, msg->npages);
540 current_object.tx_set = 0;
542 npages = OBJECT_PAGE_COUNT(*obj);
543 obj->size = msg->npages * S_PAGE;
545 p = malloc(OBJECT_PAGE_COUNT(*obj) *
sizeof(*obj->pages));
547 PRINTF(
"Failed to reallocate memory for pages!\n");
551 memcpy(p, obj->pages, npages *
sizeof(*obj->pages));
553 obj->pages = (
struct deluge_page *)p;
555 if(msg->npages < npages) {
556 npages = msg->npages;
559 for(i = 0; i < npages; i++) {
560 if(msg->version_vector[i] > obj->pages[i].version) {
561 obj->pages[i].packet_set = 0;
562 obj->pages[i].flags &= ~PAGE_COMPLETE;
563 obj->pages[i].version = msg->version_vector[i];
567 for(; i < msg->npages; i++) {
568 init_page(obj, i, 0);
571 obj->current_rx_page = highest_available_page(obj);
572 obj->update_version = msg->version;
574 transition(DELUGE_STATE_RX);
577 CONST_OMEGA * ESTIMATED_TX_TIME + ((
unsigned)
random_rand() % T_R),
582 command_dispatcher(
const linkaddr_t *sender)
586 struct deluge_msg_profile *profile;
594 case DELUGE_CMD_SUMMARY:
595 if(len >=
sizeof(
struct deluge_msg_summary))
596 handle_summary((
struct deluge_msg_summary *)msg, sender);
598 case DELUGE_CMD_REQUEST:
599 if(len >=
sizeof(
struct deluge_msg_request))
600 handle_request((
struct deluge_msg_request *)msg);
602 case DELUGE_CMD_PACKET:
603 if(len >=
sizeof(
struct deluge_msg_packet))
604 handle_packet((
struct deluge_msg_packet *)msg);
606 case DELUGE_CMD_PROFILE:
607 profile = (
struct deluge_msg_profile *)msg;
608 if(len >=
sizeof(*profile) &&
609 len >=
sizeof(*profile) + profile->npages * profile->version_vector[0])
610 handle_profile((
struct deluge_msg_profile *)msg);
613 PRINTF(
"Incoming packet with unknown command: %d\n", msg[0]);
618 unicast_recv(
struct unicast_conn *c,
const linkaddr_t *sender)
620 command_dispatcher(sender);
624 broadcast_recv(
struct broadcast_conn *c,
const linkaddr_t *sender)
626 command_dispatcher(sender);
630 deluge_disseminate(
char *file,
unsigned version)
633 if(next_object_id > 0 || init_object(¤t_object, file, version) < 0) {
644 static unsigned time_counter;
645 static unsigned r_rand;
653 broadcast_open(&deluge_broadcast, DELUGE_BROADCAST_CHANNEL, &broadcast_call);
654 unicast_open(&deluge_uc, DELUGE_UNICAST_CHANNEL, &unicast_call);
657 PRINTF(
"Maintaining state for object %s of %d pages\n",
658 current_object.filename, OBJECT_PAGE_COUNT(current_object));
660 deluge_state = DELUGE_STATE_MAINTAIN;
662 for(r_interval = T_LOW;;) {
663 if(neighbor_inconsistency) {
666 neighbor_inconsistency = 0;
669 r_interval = (2 * r_interval >= T_HIGH) ? T_HIGH : 2 * r_interval;
672 r_rand = r_interval / 2 + ((unsigned)
random_rand() % (r_interval / 2));
678 (
void *)(
void *)advertise_summary, ¤t_object);
681 ctimer_set(&profile_timer, r_rand * CLOCK_SECOND,
682 (
void *)(
void *)send_profile, ¤t_object);
684 LONG_TIMER(et, time_counter, r_interval);
688 unicast_close(&deluge_uc);
690 if(current_object.cfs_fd >= 0) {
693 if(current_object.pages !=
NULL) {
694 free(current_object.pages);
int packetbuf_copyfrom(const void *from, uint16_t len)
Copy from external data into the packetbuf.
void broadcast_close(struct broadcast_conn *c)
Close a broadcast connection.
int cfs_open(const char *name, int flags)
Open a file.
#define LEDS_RED
LED1 (Red) -> PC0.
cfs_offset_t cfs_seek(int fd, cfs_offset_t offset, int whence)
Seek to a specified position in an open file.
#define PROCESS_BEGIN()
Define the beginning of a process.
#define CFS_WRITE
Specify that cfs_open() should open a file for writing.
int ctimer_expired(struct ctimer *c)
Check if a callback timer has expired.
#define NULL
The null pointer.
#define CFS_SEEK_END
Specify that cfs_seek() should compute the offset from the end of the file.
process_event_t process_alloc_event(void)
Allocate a global event number.
#define CFS_READ
Specify that cfs_open() should open a file for reading.
Header file for the Rime stack
uint16_t packetbuf_datalen(void)
Get the length of the data in the packetbuf.
void linkaddr_copy(linkaddr_t *dest, const linkaddr_t *src)
Copy a Rime address.
#define PROCESS_THREAD(name, ev, data)
Define the body of a process.
#define PROCESS_END()
Define the end of a process.
CCIF clock_time_t clock_time(void)
Get the current clock time.
void ctimer_set(struct ctimer *c, clock_time_t t, void(*f)(void *), void *ptr)
Set a callback timer.
#define PROCESS_EXITHANDLER(handler)
Specify an action when a process exits.
Callback structure for broadcast.
void ctimer_reset(struct ctimer *c)
Reset a callback timer with the same interval as was previously set.
#define PROCESS(name, strname)
Declare a process.
int broadcast_send(struct broadcast_conn *c)
Send an identified best-effort broadcast packet.
void process_start(struct process *p, process_data_t data)
Start a process.
#define CFS_SEEK_SET
Specify that cfs_seek() should compute the offset from the beginning of the file. ...
Header file for the Contiki ELF loader.
void * packetbuf_dataptr(void)
Get a pointer to the data in the packetbuf.
void broadcast_open(struct broadcast_conn *c, uint16_t channel, const struct broadcast_callbacks *u)
Set up an identified best-effort broadcast connection.
Header file for the CRC16 calculcation
void ctimer_stop(struct ctimer *c)
Stop a pending callback timer.
unsigned short random_rand(void)
Generate the next state and return the upper part of it.
unsigned short crc16_data(const unsigned char *data, int len, unsigned short acc)
Calculate the CRC16 over a data area.
void cfs_close(int fd)
Close an open file.
#define CLOCK_SECOND
A second, measured in system clock time.