1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Copyright 2017 ETH Zurich. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![deny(missing_docs)]

//! Internal APIs of the Strymon coordinator.
//!
//! This library contains the implementation and the internals of the Strymon coordinator. This
//! crate is not intended to be used by end-users directly. Most likely, you will want to use the
//! [`strymon` command line utility](https://strymon-system.github.io/docs/command-line-interface)
//! to start a new coordinator instead.
//!
//! # Implementation
//!
//! The Strymon coordinator maintains a connection with most components of a running Strymon
//! cluster. In order to be able to handle concurrent requests, it's implementation is heavly
//! based on on [`futures`](https://docs.rs/futures). Each potentially blocking request is
//! transformed into a future and polled to completion by a `tokio-core` reactor (this dependency is
//! likely to be replaced by the `LocalPool` executor in futures 0.2, as the implementation does
//! not rely on Tokio's asynchronous I/O primitives).
//!
//! Part of the coordinator implementation is also the [**Catalog**](catalog/index.html), a data
//! structure representing the current state of the Strymon cluster.
//!
//! ## Exposed network services
//!
//! The coordinator exposes two
//! **[`strymon_communication::rpc`](../strymon_communication/rpc/index.html)** interfaces:
//!
//!   1. [`CoordinatorRPC`](../strymon_rpc/coordinator/index.html) for submitting and managing jobs.
//!   It's address has to be known by any client in advance. By default, the coordinator will try
//!   to expose this service on TCP port `9189`.
//!   2. [`CatalogRPC`](../strymon_rpc/coordinator/catalog/index.html) for querying the catalog
//!   using the [`Client`](../strymon_job/operators/service/struct.Client.html) infrastructure of
//!   [`strymon_job`](../strymon_job/index.html). It is exported on an ephemerial TCP port which
//!   can be obtained through a `Subscription` or `Lookup` request on the coordinator interface.
//!
//! ## Handling clients and concurrent requests
//!
//! The coordinator maintains a connection to each connected client (which can be a submitter, an
//! executor or a job). Incoming client requests are handled by the
//! [`Dispatch`](dispatch/struct.Dispatch.html) type, which is created for each accepted
//! connection.
//!
//! The [`Coordinator`](handler/struct.Coordinator.html) type implements the bulk of request
//! handling and contains the state shared by all clients. Its external interface is mirrored
//! through the cloneable [`CoordinatorRef`](handler/struct.CoordinatorRef.html) handle. It is
//! essentially wrapper around `Rc<RefCell<Coordinator>>`, however it also tracks state created
//! by this client (such as issued publications). This allows us to automatically remove the state
//! once the associated client disconnects.
//!
//! A client might issue a request which cannot be handled immediately. Such requests (e.g.
//! a blocking subscription request which only resolves once a matching topic is published) are
//! implemented as a future, which are polled to completion by the internal `tokio-core` reactor.


#[macro_use]
extern crate log;
extern crate rand;
extern crate futures;
extern crate futures_timer;
extern crate tokio_core;

extern crate strymon_rpc;
extern crate strymon_model;
extern crate strymon_communication;

use std::io::Result;

use futures::stream::Stream;
use tokio_core::reactor::Core;

use strymon_communication::Network;

use strymon_rpc::coordinator::CoordinatorRPC;

use self::handler::Coordinator;
use self::dispatch::Dispatch;
use self::catalog::Catalog;

pub mod handler;
pub mod catalog;
pub mod dispatch;

mod util;

/// Creates a new coordinator instance.
///
/// # Examples
/// ```rust,no_run
/// use strymon_coordinator::Builder;
///
/// let mut coord = Builder::default();
/// coord
///     .hostname("localhost".to_string())
///     .port(9189);
/// coord
///     .run()
///     .expect("failed to run coordinator");
/// ```
pub struct Builder {
    port: u16,
    hostname: Option<String>,
}

impl Builder {
    /// Sets the externally reachable hostname of this machine
    /// (default: [*inferred*](../strymon_communication/struct.Network.html#method.new)).
    pub fn hostname(&mut self, hostname: String) -> &mut Self {
        self.hostname = Some(hostname);
        self
    }

    /// Sets the port on which the coordinator service is exposed (default: `9189`).
    pub fn port(&mut self, port: u16) -> &mut Self {
        self.port = port;
        self
    }
}

impl Default for Builder {
    fn default() -> Self {
        Builder { port: 9189, hostname: None }
    }
}

impl Builder {
    /// Starts and runs a new coordinator instance.
    ///
    /// This blocks the current thread until the coordinator service shuts down, which currently
    /// only happens if an error occurs.
    ///
    /// Internally, this first creates a new
    /// [`CoordinatorRPC`](../strymon_rpc/coordinator/index.html) and a
    /// [`CatalogRPC`](../strymon_rpc/coordinator/catalog/index.html) service, instantiates an
    /// empty [`Catalog`](catalog/struct.Catalog.html) and then dispatches requests to be handled
    /// by request [`handler`](handler/index.html) logic.
    pub fn run(self) -> Result<()> {
        let network = Network::new(self.hostname)?;
        let server = network.server::<CoordinatorRPC, _>(self.port)?;

        let mut core = Core::new()?;
        let handle = core.handle();
        let (catalog_addr, catalog_service) = catalog::Service::new(&network, &handle)?;

        let coordinate = futures::lazy(move || {
            let catalog = Catalog::new(catalog_addr);
            let coord = Coordinator::new(catalog, handle.clone());

            // dispatch requests for the catalog
            let catalog_coord = coord.clone();
            handle.spawn(catalog_service.for_each(move |req| {
                catalog_coord.catalog_request(req).map_err(|err| {
                    error!("Invalid catalog request: {:?}", err)
                })
            }));

            server.for_each(move |(tx, rx)| {
                let disp = Dispatch::new(coord.clone(), handle.clone(), tx);
                disp.client(rx)
            })
        });

        core.run(coordinate)
    }
}