Function dht_periodic

Synopsis

#include <dht.h>

int dht_periodic(const void *buf, size_t buflen, const struct sockaddr *from, int fromlen, time_t *tosleep, dht_callback_t *callback, void *closure)

Description

No description yet.

Mentioned in

Source

Lines 2032-2358 in dht.c. Line 43 in dht.h.

int
dht_periodic(const void *buf, size_t buflen,
             const struct sockaddr *from, int fromlen,
             time_t *tosleep,
             dht_callback_t *callback, void *closure)
{
    dht_gettimeofday(&now, NULL);

    if(buflen > 0) {
        int message;
        struct parsed_message m;
        unsigned short ttid;

        if(is_martian(from))
            goto dontread;

        if(node_blacklisted(from, fromlen)) {
            debugf("Received packet from blacklisted node.\n");
            goto dontread;
        }

        if(((char*)buf)[buflen] != '\0') {
            debugf("Unterminated message.\n");
            errno = EINVAL;
            return -1;
        }

        memset(&m, 0, sizeof(m));
        message = parse_message(buf, buflen, &m);

        if(message < 0 || message == ERROR || id_cmp(m.id, zeroes) == 0) {
            debugf("Unparseable message: ");
            debug_printable(buf, buflen);
            debugf("\n");
            goto dontread;
        }

        if(id_cmp(m.id, myid) == 0) {
            debugf("Received message from self.\n");
            goto dontread;
        }

        if(message > REPLY) {
            /* Rate limit requests. */
            if(!token_bucket()) {
                debugf("Dropping request due to rate limiting.\n");
                goto dontread;
            }
        }

        switch(message) {
        case REPLY:
            if(m.tid_len != 4) {
                debugf("Broken node truncates transaction ids: ");
                debug_printable(buf, buflen);
                debugf("\n");
                /* This is really annoying, as it means that we will
                   time-out all our searches that go through this node.
                   Kill it. */
                blacklist_node(m.id, from, fromlen);
                goto dontread;
            }
            if(tid_match(m.tid, "pn", NULL)) {
                debugf("Pong!\n");
                new_node(m.id, from, fromlen, 2);
            } else if(tid_match(m.tid, "fn", NULL) ||
                      tid_match(m.tid, "gp", NULL)) {
                int gp = 0;
                struct search *sr = NULL;
                if(tid_match(m.tid, "gp", &ttid)) {
                    gp = 1;
                    sr = find_search(ttid, from->sa_family);
                }
                debugf("Nodes found (%d+%d)%s!\n",
                       m.nodes_len/26, m.nodes6_len/38,
                       gp ? " for get_peers" : "");
                if(m.nodes_len % 26 != 0 || m.nodes6_len % 38 != 0) {
                    debugf("Unexpected length for node info!\n");
                    blacklist_node(m.id, from, fromlen);
                } else if(gp && sr == NULL) {
                    debugf("Unknown search!\n");
                    new_node(m.id, from, fromlen, 1);
                } else {
                    int i;
                    new_node(m.id, from, fromlen, 2);
                    for(i = 0; i < m.nodes_len / 26; i++) {
                        unsigned char *ni = m.nodes + i * 26;
                        struct sockaddr_in sin;
                        if(id_cmp(ni, myid) == 0)
                            continue;
                        memset(&sin, 0, sizeof(sin));
                        sin.sin_family = AF_INET;
                        memcpy(&sin.sin_addr, ni + 20, 4);
                        memcpy(&sin.sin_port, ni + 24, 2);
                        new_node(ni, (struct sockaddr*)&sin, sizeof(sin), 0);
                        if(sr && sr->af == AF_INET) {
                            insert_search_node(ni,
                                               (struct sockaddr*)&sin,
                                               sizeof(sin),
                                               sr, 0, NULL, 0);
                        }
                    }
                    for(i = 0; i < m.nodes6_len / 38; i++) {
                        unsigned char *ni = m.nodes6 + i * 38;
                        struct sockaddr_in6 sin6;
                        if(id_cmp(ni, myid) == 0)
                            continue;
                        memset(&sin6, 0, sizeof(sin6));
                        sin6.sin6_family = AF_INET6;
                        memcpy(&sin6.sin6_addr, ni + 20, 16);
                        memcpy(&sin6.sin6_port, ni + 36, 2);
                        new_node(ni, (struct sockaddr*)&sin6, sizeof(sin6), 0);
                        if(sr && sr->af == AF_INET6) {
                            insert_search_node(ni,
                                               (struct sockaddr*)&sin6,
                                               sizeof(sin6),
                                               sr, 0, NULL, 0);
                        }
                    }
                    if(sr)
                        /* Since we received a reply, the number of
                           requests in flight has decreased.  Let's push
                           another request. */
                        search_send_get_peers(sr, NULL);
                }
                if(sr) {
                    insert_search_node(m.id, from, fromlen, sr,
                                       1, m.token, m.token_len);
                    if(m.values_len > 0 || m.values6_len > 0) {
                        debugf("Got values (%d+%d)!\n",
                               m.values_len / 6, m.values6_len / 18);
                        if(callback) {
                            if(m.values_len > 0)
                                (*callback)(closure, DHT_EVENT_VALUES, sr->id,
                                            (void*)m.values, m.values_len);

                            if(m.values6_len > 0)
                                (*callback)(closure, DHT_EVENT_VALUES6, sr->id,
                                            (void*)m.values6, m.values6_len);
                        }
                    }
                }
            } else if(tid_match(m.tid, "ap", &ttid)) {
                struct search *sr;
                debugf("Got reply to announce_peer.\n");
                sr = find_search(ttid, from->sa_family);
                if(!sr) {
                    debugf("Unknown search!\n");
                    new_node(m.id, from, fromlen, 1);
                } else {
                    int i;
                    new_node(m.id, from, fromlen, 2);
                    for(i = 0; i < sr->numnodes; i++)
                        if(id_cmp(sr->nodes[i].id, m.id) == 0) {
                            sr->nodes[i].request_time = 0;
                            sr->nodes[i].reply_time = now.tv_sec;
                            sr->nodes[i].acked = 1;
                            sr->nodes[i].pinged = 0;
                            break;
                        }
                    /* See comment for gp above. */
                    search_send_get_peers(sr, NULL);
                }
            } else {
                debugf("Unexpected reply: ");
                debug_printable(buf, buflen);
                debugf("\n");
            }
            break;
        case PING:
            debugf("Ping (%d)!\n", m.tid_len);
            new_node(m.id, from, fromlen, 1);
            debugf("Sending pong.\n");
            send_pong(from, fromlen, m.tid, m.tid_len);
            break;
        case FIND_NODE:
            debugf("Find node!\n");
            new_node(m.id, from, fromlen, 1);
            debugf("Sending closest nodes (%d).\n", m.want);
            send_closest_nodes(from, fromlen,
                               m.tid, m.tid_len, m.target, m.want,
                               0, NULL, NULL, 0);
            break;
        case GET_PEERS:
            debugf("Get_peers!\n");
            new_node(m.id, from, fromlen, 1);
            if(id_cmp(m.info_hash, zeroes) == 0) {
                debugf("Eek!  Got get_peers with no info_hash.\n");
                send_error(from, fromlen, m.tid, m.tid_len,
                           203, "Get_peers with no info_hash");
                break;
            } else {
                struct storage *st = find_storage(m.info_hash);
                unsigned char token[TOKEN_SIZE];
                make_token(from, 0, token);
                if(st && st->numpeers > 0) {
                     debugf("Sending found%s peers.\n",
                            from->sa_family == AF_INET6 ? " IPv6" : "");
                     send_closest_nodes(from, fromlen,
                                        m.tid, m.tid_len,
                                        m.info_hash, m.want,
                                        from->sa_family, st,
                                        token, TOKEN_SIZE);
                } else {
                    debugf("Sending nodes for get_peers.\n");
                    send_closest_nodes(from, fromlen,
                                       m.tid, m.tid_len, m.info_hash, m.want,
                                       0, NULL, token, TOKEN_SIZE);
                }
            }
            break;
        case ANNOUNCE_PEER:
            debugf("Announce peer!\n");
            new_node(m.id, from, fromlen, 1);
            if(id_cmp(m.info_hash, zeroes) == 0) {
                debugf("Announce_peer with no info_hash.\n");
                send_error(from, fromlen, m.tid, m.tid_len,
                           203, "Announce_peer with no info_hash");
                break;
            }
            if(!token_match(m.token, m.token_len, from)) {
                debugf("Incorrect token for announce_peer.\n");
                send_error(from, fromlen, m.tid, m.tid_len,
                           203, "Announce_peer with wrong token");
                break;
            }
            if(m.implied_port != 0) {
                /* Do this even if port > 0.  That's what the spec says. */
                switch(from->sa_family) {
                case AF_INET:
                    m.port = htons(((struct sockaddr_in*)from)->sin_port);
                    break;
                case AF_INET6:
                    m.port = htons(((struct sockaddr_in6*)from)->sin6_port);
                    break;
                }
            }
            if(m.port == 0) {
                debugf("Announce_peer with forbidden port %d.\n", m.port);
                send_error(from, fromlen, m.tid, m.tid_len,
                           203, "Announce_peer with forbidden port number");
                break;
            }
            storage_store(m.info_hash, from, m.port);
            /* Note that if storage_store failed, we lie to the requestor.
               This is to prevent them from backtracking, and hence
               polluting the DHT. */
            debugf("Sending peer announced.\n");
            send_peer_announced(from, fromlen, m.tid, m.tid_len);
        }
    }

 dontread:
    if(now.tv_sec >= rotate_secrets_time)
        rotate_secrets();

    if(now.tv_sec >= expire_stuff_time) {
        expire_buckets(buckets);
        expire_buckets(buckets6);
        expire_storage();
        expire_searches(callback, closure);
    }

    if(search_time > 0 && now.tv_sec >= search_time) {
        struct search *sr;
        sr = searches;
        while(sr) {
            if(!sr->done &&
               sr->step_time + DHT_SEARCH_RETRANSMIT / 2 + 1 <= now.tv_sec) {
                search_step(sr, callback, closure);
            }
            sr = sr->next;
        }

        search_time = 0;

        sr = searches;
        while(sr) {
            if(!sr->done) {
                time_t tm = sr->step_time +
                    DHT_SEARCH_RETRANSMIT + random() % DHT_SEARCH_RETRANSMIT;
                if(search_time == 0 || search_time > tm)
                    search_time = tm;
            }
            sr = sr->next;
        }
    }

    if(now.tv_sec >= confirm_nodes_time) {
        int soon = 0;

        soon |= bucket_maintenance(AF_INET);
        soon |= bucket_maintenance(AF_INET6);

        if(!soon) {
            if(mybucket_grow_time >= now.tv_sec - 150)
                soon |= neighbourhood_maintenance(AF_INET);
            if(mybucket6_grow_time >= now.tv_sec - 150)
                soon |= neighbourhood_maintenance(AF_INET6);
        }

        /* Given the timeouts in bucket_maintenance, with a 22-bucket
           table, worst case is a ping every 18 seconds (22 buckets plus
           11 buckets overhead for the larger buckets).  Keep the "soon"
           case within 15 seconds, which gives some margin for neighbourhood
           maintenance. */

        if(soon)
            confirm_nodes_time = now.tv_sec + 5 + random() % 10;
        else
            confirm_nodes_time = now.tv_sec + 60 + random() % 120;
    }

    if(confirm_nodes_time > now.tv_sec)
        *tosleep = confirm_nodes_time - now.tv_sec;
    else
        *tosleep = 0;

    if(search_time > 0) {
        if(search_time <= now.tv_sec)
            *tosleep = 0;
        else if(*tosleep > search_time - now.tv_sec)
            *tosleep = search_time - now.tv_sec;
    }

    return 1;
}





Add Discussion as Guest

Log in