1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
use std::{
    collections::{HashMap, HashSet},
    fmt::{Display, Error as FmtError, Formatter},
};

use futures::channel::mpsc;

use crate::{metrics::Metrics, Data, PeerId, PublicKey};

mod direction;
use direction::DirectedPeers;

/// Error during sending data through the Manager
#[derive(Debug, PartialEq, Eq)]
pub enum SendError {
    /// Outgoing network connection closed
    ConnectionClosed,
    /// Peer not added to the manager
    PeerNotFound,
}

impl Display for SendError {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
        use SendError::*;
        match self {
            ConnectionClosed => write!(f, "worker dead"),
            PeerNotFound => write!(f, "peer not found"),
        }
    }
}

/// Possible results of adding connections.
#[derive(Debug, PartialEq, Eq)]
pub enum AddResult {
    /// We do not want to maintain a connection with this peer.
    Uninterested,
    /// Connection added.
    Added,
    /// Old connection replaced with new one.
    Replaced,
}

pub struct ManagerStatus<PK: PublicKey + PeerId> {
    outgoing_peers: HashSet<PK>,
    missing_outgoing: HashSet<PK>,
    incoming_peers: HashSet<PK>,
    missing_incoming: HashSet<PK>,
}

impl<PK: PublicKey + PeerId> ManagerStatus<PK> {
    fn new<A: Data, D: Data>(manager: &Manager<PK, A, D>) -> Self {
        let mut incoming_peers = HashSet::new();
        let mut missing_incoming = HashSet::new();
        let mut outgoing_peers = HashSet::new();
        let mut missing_outgoing = HashSet::new();

        for peer in manager.wanted.incoming_peers() {
            match manager.active_connection(peer) {
                true => incoming_peers.insert(peer.clone()),
                false => missing_incoming.insert(peer.clone()),
            };
        }
        for peer in manager.wanted.outgoing_peers() {
            match manager.active_connection(peer) {
                true => outgoing_peers.insert(peer.clone()),
                false => missing_outgoing.insert(peer.clone()),
            };
        }
        ManagerStatus {
            incoming_peers,
            missing_incoming,
            outgoing_peers,
            missing_outgoing,
        }
    }

    fn wanted_incoming(&self) -> usize {
        self.incoming_peers.len().checked_add(self.missing_incoming.len()).unwrap()
    }

    fn wanted_outgoing(&self) -> usize {
        self.outgoing_peers.len().checked_add(self.missing_outgoing.len()).unwrap()
    }
}

fn pretty_peer_id_set<PK: PublicKey + PeerId>(set: &HashSet<PK>) -> String {
    set.iter()
        .map(|peer_id| peer_id.to_short_string())
        .collect::<Vec<_>>()
        .join(", ")
}

impl<PK: PublicKey + PeerId> Display for ManagerStatus<PK> {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
        let wanted_incoming = self.wanted_incoming();
        let wanted_outgoing = self.wanted_outgoing();
        if wanted_incoming + wanted_outgoing == 0 {
            return write!(f, "not maintaining any connections; ");
        }

        match wanted_incoming {
            0 => write!(f, "not expecting any incoming connections; ")?,
            _ => {
                write!(f, "expecting {wanted_incoming:?} incoming connections; ")?;
                match self.incoming_peers.is_empty() {
                    // We warn about the lack of incoming connections, because this is relatively
                    // likely to be a common misconfiguration; much less the case for outgoing.
                    true => write!(f, "WARNING! No incoming peers even though we expected them, maybe connecting to us is impossible; ")?,
                    false => write!(
                            f,
                            "have - {:?} [{}]; ",
                            self.incoming_peers.len(),
                            pretty_peer_id_set(&self.incoming_peers),
                    )?,
                }
                if !self.missing_incoming.is_empty() {
                    write!(
                        f,
                        "missing - {:?} [{}]; ",
                        self.missing_incoming.len(),
                        pretty_peer_id_set(&self.missing_incoming),
                    )?;
                }
            }
        }

        match wanted_outgoing {
            0 => write!(f, "not attempting any outgoing connections; ")?,
            _ => {
                write!(f, "attempting {wanted_outgoing:?} outgoing connections; ")?;
                if !self.outgoing_peers.is_empty() {
                    write!(
                        f,
                        "have - {:?} [{}]; ",
                        self.outgoing_peers.len(),
                        pretty_peer_id_set(&self.outgoing_peers),
                    )?;
                }
                if !self.missing_outgoing.is_empty() {
                    write!(
                        f,
                        "missing - {:?} [{}]; ",
                        self.missing_outgoing.len(),
                        pretty_peer_id_set(&self.missing_outgoing),
                    )?;
                }
            }
        }

        Ok(())
    }
}

/// Network component responsible for holding the list of peers that we
/// want to connect to or let them connect to us, and managing the established
/// connections.
pub struct Manager<PK: PublicKey + PeerId, A: Data, D: Data> {
    // Which peers we want to be connected with, and which way.
    wanted: DirectedPeers<PK, A>,
    // This peers we are connected with. We ensure that this is always a subset of what we want.
    have: HashMap<PK, mpsc::UnboundedSender<D>>,
}

impl<PK: PublicKey + PeerId, A: Data, D: Data> Manager<PK, A, D> {
    /// Create a new Manager with empty list of peers.
    pub fn new(own_id: PK, metrics: Metrics) -> Self {
        Manager {
            wanted: DirectedPeers::new(own_id, metrics),
            have: HashMap::new(),
        }
    }

    fn active_connection(&self, peer_id: &PK) -> bool {
        self.have
            .get(peer_id)
            .map(|sender| !sender.is_closed())
            .unwrap_or(false)
    }

    /// Add a peer to the list of peers we want to stay connected to, or
    /// update the address if the peer was already added.
    /// Returns whether we should start attempts at connecting with the peer, which depends on the
    /// coorddinated pseudorandom decision on the direction of the connection and whether this was
    /// added for the first time.
    pub fn add_peer(&mut self, peer_id: PK, address: A) -> bool {
        self.wanted.add_peer(peer_id, address)
    }

    /// Return the address of the given peer, or None if we shouldn't attempt connecting with the peer.
    pub fn peer_address(&self, peer_id: &PK) -> Option<A> {
        self.wanted.peer_address(peer_id)
    }

    /// Add an established connection with a known peer, but only if the peer is among the peers we want to be connected to.
    pub fn add_connection(
        &mut self,
        peer_id: PK,
        data_for_network: mpsc::UnboundedSender<D>,
    ) -> AddResult {
        use AddResult::*;
        if !self.wanted.interested(&peer_id) {
            return Uninterested;
        }
        match self.have.insert(peer_id, data_for_network) {
            Some(_) => Replaced,
            None => Added,
        }
    }

    /// Remove a peer from the list of peers that we want to stay connected with.
    /// Close any incoming and outgoing connections that were established.
    pub fn remove_peer(&mut self, peer_id: &PK) {
        self.wanted.remove_peer(peer_id);
        self.have.remove(peer_id);
    }

    /// Send data to a peer.
    /// Returns error if there is no outgoing connection to the peer,
    /// or if the connection is dead.
    pub fn send_to(&mut self, peer_id: &PK, data: D) -> Result<(), SendError> {
        self.have
            .get(peer_id)
            .ok_or(SendError::PeerNotFound)?
            .unbounded_send(data)
            .map_err(|_| SendError::ConnectionClosed)
    }

    /// A status of the manager, to be displayed somewhere.
    pub fn status_report(&self) -> ManagerStatus<PK> {
        ManagerStatus::new(self)
    }

    pub fn is_authorized(&self, public_key: &PK) -> bool {
        self.wanted.interested(public_key)
    }
}

#[cfg(test)]
mod tests {
    use futures::{channel::mpsc, StreamExt};

    use super::{AddResult::*, Manager, SendError};
    use crate::{
        metrics::Metrics,
        mock::{key, MockPublicKey},
    };

    type Data = String;
    type Address = String;

    #[test]
    fn add_remove() {
        let (own_id, _) = key();
        let mut manager = Manager::<MockPublicKey, Address, Data>::new(own_id, Metrics::noop());
        let (peer_id, _) = key();
        let (peer_id_b, _) = key();
        let address = String::from("43.43.43.43:43000");
        // add new peer - might return either true or false, depending on the ids
        let attempting_connections = manager.add_peer(peer_id.clone(), address.clone());
        // add known peer - always returns false
        assert!(!manager.add_peer(peer_id.clone(), address.clone()));
        // get address
        match attempting_connections {
            true => assert_eq!(manager.peer_address(&peer_id), Some(address)),
            false => assert_eq!(manager.peer_address(&peer_id), None),
        }
        // try to get address of an unknown peer
        assert_eq!(manager.peer_address(&peer_id_b), None);
        // remove peer
        manager.remove_peer(&peer_id);
        // try to get address of removed peer
        assert_eq!(manager.peer_address(&peer_id), None);
        // remove again
        manager.remove_peer(&peer_id);
        // remove unknown peer
        manager.remove_peer(&peer_id_b);
    }

    #[tokio::test]
    async fn send_receive() {
        let (mut connecting_id, _) = key();
        let mut connecting_manager =
            Manager::<MockPublicKey, Address, Data>::new(connecting_id.clone(), Metrics::noop());
        let (mut listening_id, _) = key();
        let mut listening_manager =
            Manager::<MockPublicKey, Address, Data>::new(listening_id.clone(), Metrics::noop());
        let data = String::from("DATA");
        let address = String::from("43.43.43.43:43000");
        let (tx, _rx) = mpsc::unbounded();
        // try add unknown peer
        assert_eq!(
            connecting_manager.add_connection(listening_id.clone(), tx),
            Uninterested
        );
        // sending should fail
        assert_eq!(
            connecting_manager.send_to(&listening_id, data.clone()),
            Err(SendError::PeerNotFound)
        );
        // add peer, this time for real
        if connecting_manager.add_peer(listening_id.clone(), address.clone()) {
            assert!(!listening_manager.add_peer(connecting_id.clone(), address.clone()))
        } else {
            // We need to switch the names around, because the connection was randomly the
            // other way around.
            std::mem::swap(&mut connecting_id, &mut listening_id);
            std::mem::swap(&mut connecting_manager, &mut listening_manager);
            assert!(connecting_manager.add_peer(listening_id.clone(), address.clone()));
        }
        // add outgoing to connecting
        let (tx, mut rx) = mpsc::unbounded();
        assert_eq!(
            connecting_manager.add_connection(listening_id.clone(), tx),
            Added
        );
        // send and receive connecting
        assert!(connecting_manager
            .send_to(&listening_id, data.clone())
            .is_ok());
        assert_eq!(data, rx.next().await.expect("should receive"));
        // add incoming to listening
        let (tx, mut rx) = mpsc::unbounded();
        assert_eq!(
            listening_manager.add_connection(connecting_id.clone(), tx),
            Added
        );
        // send and receive listening
        assert!(listening_manager
            .send_to(&connecting_id, data.clone())
            .is_ok());
        assert_eq!(data, rx.next().await.expect("should receive"));
        // remove peer
        listening_manager.remove_peer(&connecting_id);
        // receiving should fail
        assert!(rx.next().await.is_none());
    }
}