Creating A Cluster

Local clusters can be defined easily through using the AgentFactory class. The AgentFactory will initialise n (provided) agents with the functionality required for an Agent to run; initialising a ServiceManager and other core functionality. Let’s dive in and create five agents:

AgentFactory agentFactory = new AgentFactory();
List<Agent> agents = agentFactory.createN(5);

The above initialises each agent with a human-readable name, a PeerId (which uniquely identifies each Agent), and a ServiceManager that has no registered services. This is as bare-bones that you can get to a default template for creating an Agent.

The Service Manager

The ServiceManager is the backbone of each Agent. Each Agent is assigned one and can have a number of AgentService instances registered against it. The ServiceManager will manage initialising each registered service, ensuring that they start correctly and transition to a running state, or, if the service fails to start or fails at a later time, notifies all running services that a particular service has failed. Lets initialise all of our Agents with a few services:

for (Agent agent : agents) {
     ServiceManager serviceManager = agent.serviceManager();

     serviceManager.registerService(new ConsensusNodeImpl(new ConsensusClusterConfig()));
     serviceManager.registerService(new SubsystemMonitorSpoofer());
     serviceManager.registerService(new MissionManagerSample());
     serviceManager.registerService(new LogServiceImpl());

     serviceManager.startServices();

     agent.state().transitionTo(AgentState.State.ACTIVE);
 }

All services must implement the AgentService interface in order to be able to be registered. In this interface exists much of the functionality that the ServiceManager uses to manage services effectively. For example, the service ConsensusNodeImpl requires a number of services in order for itself to function correctly and these are defined by overriding the AgentService#requiredServices method which the ServiceManager will ensure are registered during its startup procedure. In addition to this, some services may require a long time to startup and transition to a running state but the ServiceManager only allows for a certain transition timeout period to elapse, this can be overridden by AgentService#transitionTimeout. In order to initiate the aforementioned process, the Agent is started by serviceManager.startServices() and requires transitioning to an active state: agent.state().transitionTo(AgentState.State.ACTIVE)

Clients

The UNDERSEA project has been developed with the view that the end-product will be a number of active robots in some unknown environment, however, no automatic peer discovery is implemented at this time. Local node discovery can be performed as follows:

for (Agent a : agents) {
    for (Agent b : agents) {
        if (a != b) {
            ConsensusNodeImpl consensusNodeA = a.serviceManager().getService(ConsensusNodeImpl.class);
            ConsensusNodeImpl consensusNodeB = b.serviceManager().getService(ConsensusNodeImpl.class);

            consensusNodeA.state().discoverNode(consensusNodeB);
        }
    }
}

This will inform the Agent of another client and build a gRPC client in order to perform the required consensus algorithm tasks throughout its availability. Once an Agent discovers another, an automatic voting round is started and a leader is elected.

Consensus Algorithm

The UNDERSEA project uses a custom implementation of the Raft consensus algorithm in order deterministically elect a leader within a cluster. This leader will perform a number of tasks and manage the cluster state and work towards ensuring that a mission is completed successfully. This follows a three step process:

  • Each node finds out the current state of every client in the cluster and works out an overall cost associated with that client; comprised of, the client’s battery level, number of subsystems that are present on it and their accuracy. This can be configured in your implementation. We wouldn’t want a leader to be elected and to perform more tasks when they have low battery.
  • Every node votes for who they calculated to have the lowest cost.
  • The leader is elected and transitions to a leader state.

This process happens every time a client joins and leaves a cluster to ensure that the most suitable leader is always elected. Following this, there are two further principles that can be introduced: Transactions and ServiceCallbacks.

Transactions and Callbacks

Transactions lie at the core of how an AgentService can communicate with another in a simple fashion. If, for example, a service wishes to propagate a particular life cycle event to all other services, then a Transaction is the most suitable choice. In the example below, the Transaction and handling the response, is registered as a ServiceCallback against the ConsensusNodeImpl which will fire automatically when the node is elected as a leader.

consensusNode.registerCallback(new ServiceCallback(LifecycleEvent.ELECTED_LEADER, () -> {
    Transaction transaction = new Transaction.Builder(agent)
            .forService(MissionManager.class)
            .withStatus(LifecycleEvent.ELECTED_LEADER)
            .usingExecutorService(consensusNode.getListeningExecutorService())
            .invokedBy(consensusNode)
            .build();

    Set<ListenableFuture<?>> futures = agent.serviceManager().commitTransaction(transaction);

    for (ListenableFuture<?> future : futures) {
        Futures.addCallback(future, new FutureCallback<Object>() {
            @Override
            public void onSuccess(@Nullable Object result) {
                consensusNode.distributeMission((GeneratedMission) result);
            }

            @Override
            public void onFailure(Throwable t) {
                throw new RuntimeException(t);
            }

        }, consensusNode.getSingleThreadScheduledExecutor());
    }
}));

The above will construct a Transaction and commit it to the registered MissionManager service and notify it that the ConsensusNode has been elected the cluster leader and the ServiceManager will commit the transaction to that service and return a set of futures that we can add callbacks. This, however, requires that the MissionManager has overridden the AgentService#executeTransaction. If not, the transaction will be lost and nothing will be returned. The destination service can switch on the status that the transaction has marked on it to ensure that it executes the correct path. The sample implementation for MOOS uses this system to fire a transaction to all services and transitions the system state to a leader elected state. This causes the registered mission manager to decompose the target polygon, generate mission paths for each client and distribute them respectively.

Complete code

package com.type2labs.undersea.tutorials.tutoriala;


import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.type2labs.undersea.common.agent.Agent;
import com.type2labs.undersea.common.agent.AgentFactory;
import com.type2labs.undersea.common.agent.AgentState;
import com.type2labs.undersea.common.consensus.ConsensusClusterConfig;
import com.type2labs.undersea.common.logger.LogServiceImpl;
import com.type2labs.undersea.common.missions.planner.model.GeneratedMission;
import com.type2labs.undersea.common.missions.planner.model.MissionManager;
import com.type2labs.undersea.common.monitor.impl.SubsystemMonitorSpoofer;
import com.type2labs.undersea.common.service.ServiceManager;
import com.type2labs.undersea.common.service.transaction.LifecycleEvent;
import com.type2labs.undersea.common.service.transaction.ServiceCallback;
import com.type2labs.undersea.common.service.transaction.Transaction;
import com.type2labs.undersea.prospect.impl.ConsensusNodeImpl;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.List;
import java.util.Set;
public class RunnerA {

    static {
        LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
        Configuration config = ctx.getConfiguration();
        LoggerConfig loggerConfig = config.getLoggerConfig("io.netty");
        loggerConfig.setLevel(Level.INFO);
        ctx.updateLoggers();
    }

    public static void main(String[] args) {
        AgentFactory agentFactory = new AgentFactory();
        List<Agent> agents = agentFactory.createN(5);

        for (Agent agent : agents) {
            ServiceManager serviceManager = agent.serviceManager();
            ConsensusNodeImpl consensusNode = new ConsensusNodeImpl(new ConsensusClusterConfig());
            consensusNode.registerCallback(new ServiceCallback(LifecycleEvent.ELECTED_LEADER, () -> {
                Transaction transaction = new Transaction.Builder(agent)
                        .forService(MissionManager.class)
                        .withStatus(LifecycleEvent.ELECTED_LEADER)
                        .usingExecutorService(consensusNode.getListeningExecutorService())
                        .invokedBy(consensusNode)
                        .build();

                Set<ListenableFuture<?>> futures = agent.serviceManager().commitTransaction(transaction);

                for (ListenableFuture<?> future : futures) {
                    Futures.addCallback(future, new FutureCallback<Object>() {
                        @Override
                        public void onSuccess(@Nullable Object result) {
                            consensusNode.distributeMission((GeneratedMission) result);
                        }

                        @Override
                        public void onFailure(Throwable t) {
                            throw new RuntimeException(t);
                        }

                    }, consensusNode.getSingleThreadScheduledExecutor());
                }
            }));

            serviceManager.registerService(consensusNode);
            serviceManager.registerService(new SubsystemMonitorSpoofer());
            serviceManager.registerService(new MissionManagerSample());
            serviceManager.registerService(new LogServiceImpl());

            serviceManager.startServices();

            agent.state().transitionTo(AgentState.State.ACTIVE);
        }

        for (Agent a : agents) {
            for (Agent b : agents) {
                if (a != b) {
                    ConsensusNodeImpl consensusNodeA = a.serviceManager().getService(ConsensusNodeImpl.class);
                    ConsensusNodeImpl consensusNodeB = b.serviceManager().getService(ConsensusNodeImpl.class);

                    consensusNodeA.state().discoverNode(consensusNodeB);
                }
            }
        }

    }

}