Esquema maestro/trabajador con Java/RMI (MasterWorker)

Se trata de un proyecto práctico de desarrollo en grupos de 2 personas cuyo plazo de entrega termina el 27 de mayo.

Consideraciones previas

Antes de describir la práctica propiamente dicha, se considera conveniente resaltar algunos aspectos sobre la misma:

Grados de libertad en el desarrollo de la práctica

Aunque todavía no sabemos nada de la práctica y no se puede enteder completamente el contenido de esta sección, se ha considerado oportuno incluir al principio del documento, para que no pasen inadvertidas, qué restricciones existen a la hora de desarrollar el código de la práctica:

Objetivo de la práctica

La práctica consiste en desarrollar un esquema maestro-trabajador que, como se ha analizado en la parte teórica de la asignatura, es el esquema de procesamiento más habitual en los entornos de computación distribuida, estando detrás de modelos tan populares como MapReduce.

En este esquema hay básicamente tres roles, que, normalmente, corresponderán a procesos ejecutando en distintas máquinas:

Repasemos a continuación el modo de operación de este modelo de procesamiento:

  1. El maestro recibe la petición de ejecutar un trabajo usando un determinado número de trabajadores.
  2. La descripción del trabajo incluye los datos de entrada del mismo, la tarea que debe realizarse sobre cada dato de entrada produciendo un resultado, así como la labor que se llevará a cabo con los resultados obtenidos.
  3. El maestro solicita al gestor el número de trajadores requeridos por el trabajo y el gestor se los asigna. Previamente, cada proceso trabajador se habrá dado de alta en el gestor.
  4. El maestro distribuye el código de la tarea entre los trabajadores asignados.
  5. El maestro va obteniendo cada dato del trabajo enviándoselo a cualquiera de los trabajadores asignados que esté libre, teniendo que esperar en caso de que todos estuvieran ocupados. También se puede considerar un esquema en el que un trabajador puede estar procesando simultáneamente hasta N datos para aprovechar su potencia de procesamiento (piense en un trabajador multinúcleo), pero para la práctica, por sencillez, hemos planteado un esquema con un dato por trabajador en cada momento.
  6. El trabajador completa el trabajo enviando el resultado.
  7. El maestro va procesando los resultados obtenidos hasta que recibe el último resultado, dando por completada la ejecución del trabajo informándole al gestor para que sepa que esos nodos trabajadores están disponibles para otros trabajos.
  8. Dado el elevado tiempo de ejecución de este tipo de trabajos y el gran número de nodos involucrados, hay una probabilidad no despreciable de que se caiga algún nodo durante la evolución del mismo. En el caso de la caída del maestro, en el esquema maestro-trabajador básico se pierde toda la computación (para evitarlo, habría que implementar un esquema de replicación para la funcionalidad del maestro). Si se trata de un trabajador, el maestro debería detectar esa caída para enviar la tarea no completada a otro nodo. En la práctica, no vamos a resolver ese escenario de error donde el trabajador se cae en la mitad de una tarea, pero sí uno más sencillo: el trabajador está caído cuando le enviamos el dato de un trabajo y, por tanto, hay que enviar ese dato a otro trabajador.

En cuanto a la tecnología de comunicación usada en la práctica, se ha elegido Java RMI (si no está familiarizado con el uso de esta tecnología puede consultar esta guía sobre la programación en Java RMI). Es importante resaltar que, gracias al mecanismo de descarga dinámica del código de las clases, la tecnología Java RMI facilita considerablemente la implementación de este esquema automatizando directamente el cuarto paso de la lista previa.

Una visión global preliminar

Para entender mejor las características del sistema que se pretende desarrollar, consideramos conveniente mostrar a priori qué pasos habría que llevar a cabo para ejecutar un trabajo en este sistema.

Con respecto a la fase de compilación, hay un script de compilación (compila.sh) dentro del directorio asociado a cada uno de los tres roles identificados, así como en el directorio donde se definen las interfaces comunes. Nótese que en un sistema de estas características normalmente cada máquina estará dedicada a realizar un determinado rol y solo necesitará tener instalado el software correspondiente a dicho rol, así como tener acceso al fichero de tipo JAR donde se encuentran definidas las interfaces comunes. Dado que, por comodidad, vamos a desarrollar y probar el código en local, ese fichero JAR se va hacer accesible a través de un enlace simbólico y se va a proporcionar un script (compila_todo.sh) para realizar la compilación de todo el código apoyándose en los scripts correspondientes de cada directorio.

Una vez compilado el código, hay que lanzar en cada máquina el software correspondiente al rol que va a interpretar esa máquina. Para ello, se dispone para cada rol de un script (ejecuta.sh) que lleva a cabo esa labor. Nótese que, por razones evidentes, el gestor es el único proceso que se va dar de alta en el RMI Registry y, por tanto, en ese nodo hay que arrancar también ese proceso de Java RMI (se proporciona para ello el script arranca_rmiregistry.sh, que recibe como argumento por qué puerto queremos que dé servicio este proceso):

fperez@maq_manager: ./arranca_rmiregistry.sh 12345 &
fperez@maq_manager: ./ejecuta.sh 12345 
A continuación, hay que lanzar todos los trabajadores (supongamos que hay 1000), que se darán de alta en el gestor, indicándoles en qué máquina y por qué puerto da servicio RMI Registry:
fperez@maq_worker1: ./ejecuta.sh maq_manager 12345
fperez@maq_worker1000: ./ejecuta.sh maq_manager 12345
En este punto ya podemos ejecutar el trabajo en el nodo maestro, teniendo que especificar, además de en qué máquina y por qué puerto da servicio RMI Registry, la clase que describe el trabajo junto con el número de trabajadores que queremos usar:
fperez@maq_master1: ./ejecuta.sh maq_manager 12345 jobs.SimpleJob 1000

Anatomía de un trabajo

Dado que la práctica está centrada en la ejecución de trabajos, en esta sección se va a presentar la clase abstracta que describe un trabajo, que, como se comentó previamente, no se puede modificar, así como un ejemplo de una clase derivada de la misma que representa un trabajo muy sencillo: calcular la suma del cuadrado de una colección de tamaño aleatorio de números aleatorios que se reciben como entrada; evidentemente, se trata de un trabajo que no tiene sentido ejecutarlo de forma remota puesto que el procesamiento de cada dato de entrada conlleva un tiempo despreciable y, a diferencia del resto de los trabajos propuestos, solo tiene interés didáctico.

Para poder entender las diferencias entre las dos versiones de la práctica, se van a presentar tanto la versión genérica, que permite que el dato de entrada y el resultado sean de cualquier tipo, como la básica, en la que el tipo del dato de entrada y el que corresponde al resultado son de la clase String.

La clase abstracta Job, incluida en el paquete jobs, permite definir un trabajo. A continuación, se muestra la versión genérica de la misma:

// Clase abstracta que define un trabajo
package jobs;
import java.io.*;

import interfaces.*;

public abstract class Job<T,S> {
    Task <T, S> t = null;

    Job(Task<T,S> t) {
        this.t = t;
    }

    public boolean initJob(int nWorkers) {
        return true;
    }

    public Task<T,S> getTask() {
        return t;
    }

    public abstract T getNextInput() throws EOFException;

    public abstract void putResult(T input, S result, Exception e);

    public void endJob() {
    }
}
Y acto seguido la versión básica:
// Clase abstracta que define un trabajo
package jobs;
import java.io.*;

import interfaces.*;

public abstract class Job {
    Task t = null;

    Job(Task t) {
        this.t = t;
    }

    public boolean initJob(int nWorkers) {
        return true;
    }

    public Task getTask() {
        return t;
    }

    public abstract String getNextInput() throws EOFException;

    public abstract void putResult(String input, String result, Exception e);

    public void endJob() {
    }
}
Para comprender cómo se comporta esta clase abstracta, a continuación, se muestra el ejemplo de un trabajo muy sencillo (clase SimpleJob del paquete jobs) en su versión genérica:
// Trabajo muy simple (es ridículo ejecutarlo de forma remota)
package jobs;

import java.util.*;
import java.io.*;
import interfaces.*;

public class SimpleJob extends Job<Double, Double> {
    int inputDataSize;
    Double [] inputData;
    int count=0;
    double total = 0;

    public SimpleJob() {
        super(new SimpleTask());
    }
@Override
    public boolean initJob(int nWorkers) {
        inputDataSize = ((int) (1000000 * Math.random()) % 100) + 1;
        inputData = new Double[inputDataSize];
        for (int i=0; i<inputData.length; i++)
            inputData[i] = Math.random();
        return true;
    }
@Override
    public Double getNextInput() throws EOFException  {
        if (count<inputDataSize) return inputData[count++];
        else throw new EOFException();
    }
@Override
    public void putResult(Double input, Double result, Exception e) {
        if (e==null) {
            System.out.println("cuadrado de " + input + " = " + result);
            total += result;
        }
        else
            System.err.println(input + ": recibida excepción imprevista " + e.toString());
    }
@Override
    public void endJob() {
        System.out.println("Total = " + total);
    }
}
Y en la versión básica que usa el tipo String tanto para datos como para resultados (observe la necesidad de conversiones de String a Double y viceversa):
/ Trabajo muy simple (es ridículo ejecutarlo de forma remota)
package jobs;

import java.util.*;
import java.io.*;
import interfaces.*;

public class SimpleJob extends Job {
    int inputDataSize;
    Double [] inputData;
    int count=0;
    double total = 0;

    public SimpleJob() {
        super(new SimpleTask());
    }
@Override
    public boolean initJob(int nWorkers) {
        inputDataSize = ((int) (1000000 * Math.random()) % 100) + 1;
        inputData = new Double[inputDataSize];
        for (int i=0; i<inputData.length; i++)
            inputData[i] = Math.random();
        return true;
    }
@Override
    public String getNextInput() throws EOFException  {
        if (count<inputDataSize) return inputData[count++].toString();
        else throw new EOFException();
    }
@Override
    public void putResult(String input, String result, Exception e) {
        if (e==null) {
            System.out.println("cuadrado de " + input + " = " + result);
            total += Double.parseDouble(result);
        }
        else
            System.err.println(input + ": recibida excepción imprevista " + e.toString());
    }
@Override
    public void endJob() {
        System.out.println("Total = " + total);
    }
}
Basándonos en la clase abstracta y en el ejemplo de clase derivada podemos explicar el modo de operación de un trabajo, teniendo en mente los tres componentes que lo definen: la fuente de datos de entrada que alimenta el trabajo, la computación que se aplica a cada dato (la tarea; nótese que un trabajo tiene asociadas dos clases: una que representa el trabajo propiamente dicho y otra, que se analizará más adelante, que corresponde a la tarea que hay que realizar sobre cada dato de entrada) y el tratamiento que se realiza con el resultado de procesar cada dato: Un trabajo, por tanto, queda definido por una clase derivada de la clase Job que representa el trabajo propiamente dicho y una clase que implementa la interfaz Task. Nótese que se trata de una interfaz que tienen que compartir el maestro y los trabajadores pero no es remota. En consecuencia, la clase que implementa esta interfaz viajará serializada desde el maestro a los trabajadores y, por tanto, además de Task, debe implementar la interfaz Serializable.

A continuación, se muestra la interfaz para la versión genérica:

// Interfaz que debe satisfacer una tarea para poder ser procesada
// No se trata de una interfaz remota. Por tanto, una clase que implementa esta
// interfaz se transferirá "serializada" del maestro al trabajador.
package interfaces;

public interface Task <T,S> {
    public S execute(T t) throws Exception;
}
Y acto seguido la correspondiente a la versión básica:
// Interfaz que debe satisfacer una tarea para poder ser procesada
// No se trata de una interfaz remota. Por tanto, una clase que implementa esta
// interfaz se transferirá "serializada" del maestro al trabajador.

package interfaces;

public interface Task  {
    public String execute(String t) throws Exception;
Observe que esta interfaz, que no puede modificarse, proporciona un método (execute) para realizar el procesamiento de un dato devolviendo un resultado y pudiendo generar una excepción.

A continuación, se muestra la tarea asociada al trabajo simple en su versión genérica:

// Tarea muy simple (es ridículo ejecutarla de forma remota)
package jobs;

import java.io.*;
import interfaces.*;

public class SimpleTask implements Task<Double, Double>, Serializable {
    public static final long serialVersionUID = 1234567895;
    public Double execute(Double x) {
            return x * x;
    }
}
y en su versión basica:
// Tarea muy simple (es ridículo ejecutarla de forma remota)
package jobs;

import java.io.*;
import interfaces.*;

public class SimpleTask implements Task, Serializable {
    public static final long serialVersionUID = 1234567895;
    public String execute(String x) {
            Double d = Double.parseDouble(x);
            Double res = d * d;
            return res.toString();
    }
}
Como parte del material de apoyo, se proporcionan cuatro ejemplos de trabajos:

Arquitectura del software del sistema

Antes de pasar a presentar la funcionalidad que se debe desarrollar, se especifica en esta sección qué distintos componentes hay en este sistema.

En primer lugar, hay que recordar que la práctica está diseñada para no permitir la definición de nuevas clases (a no ser que se trate de clases anidadas), estando todas ya presentes, aunque mayoritariamente vacías, en el material de apoyo.

En las siguientes secciones se describe el contenido de cada uno de los cuatro directorios presentes en el material de apoyo de la práctica:

En cada directorio el código fuente está almacenado dentro del subdirectorio src y dentro de este, como es preceptivo, en un subdirectorio con el nombre del paquete. Con respecto a las clases compiladas, se almacenan dentro del subdirectorio bin.

Aunque se puede usar un IDE para el desarrollo de la práctica, como se explicó al principio del documento, en cada directorio existen scripts (compila.sh) para compilar el código fuente de ese directorio. Recuerde que en el nivel superior se dispone de otro script denominado compila_todo.sh que compila todo el software usando los scripts de compilación particulares de cada directorio.

Asimismo, en todos ellos, exceptuando el correspondiente a las interfaces, se proporciona un script (ejecuta.sh) para ejecutar el programa asociado a ese directorio y, en el directorio del nodo maestro, otro script para arrancar el Registry de RMI.

Con respecto al directorio de las interfaces, el script de compilación también genera un fichero de tipo JAR que está accesible en el resto de los directorios mediante un enlace simbólico.

Revisemos, a continuación, las clases contenidas en cada directorio.

Directorio interfaces

En este directorio se almacenan las clases que definen las interfaces usadas en la interacción entre los distintos componentes y que están ubicadas en el paquete interfaces:

Directorio manager_node

En este directorio se almacenan las clases que definen la funcionalidad del nodo gestor contenidas en el paquete manager:

Directorio worker_node

En este directorio se almacenan las clases que definen la funcionalidad del trabajador que están contenidas en el paquete worker: En este directorio hay un subdirectorio denominado Ficheros que representa el hipotético sistema de ficheros distribuido/paralelo accesible desde todos los nodos trabajadores.

Directorio master_node

En este directorio se almacena la clase que define la funcionalidad del maestro (clase Master englobada en el paquete master), así como todos ejemplos de trabajos y tareas ya presentados anteriormente (incluidos en el paquete jobs donde también se encuentra la clase abstracta Job): En este directorio hay un subdirectorio denominado Ficheros que representa el hipotético sistema de ficheros local del nodo maestro.

Funcionalidad de la práctica

Como se comentó al principio del documento, se plantean dos versiones del sistema planteado: una básica, que usa datos y resultados de tipo String, y una avanzada, basada en genéricos, que permite datos y resultados de cualquier tipo. Recuerde que se puede optar por solo realizar la parte básica, por completar ambas de forma incremental o por afrontar directamente la versión genérica sin pasar por la básica pudiendo obtener también de esta forma la nota máxima.

En esta sección, se detalla la funcionalidad que se requiere implementar presentándola en varios pasos sucesivos. Se plantea de esta forma para ir incorporando la funcionalidad de manera incremental, aunque uno puede optar por afrontar desde el principio toda la funcionalidad.

Primer paso: ejecución local de un trabajo

En este primer paso no vamos a realizar ningún procesamiento distribuido sino simplemente ejecutar el trabajo en el propio nodo maestro. Esto permitirá conocer mejor los trabajos y las tareas e implementar el ciclo de vida de un trabajo. Evidentemente, toda la labor asociada a este paso se implementa únicamente en la clase Master: Para probar la funcionalidad de este primer paso se debería verificar que ejecutan correctamente en local todos los trabajos. Asegúrese de que en el trabajo FileAlphaJob se detecta y propaga la excepción FileNotFoundException cuando el fichero a procesar no existe en el sistema de ficheros accesible por el trabajador.

Segundo paso: ejecución remota secuencial de un trabajo usando un solo trabajador

En este paso ya entran en juego los tres roles y el sistema ya empieza a tener un modo de operación distribuido. Nótese que por el momento solo se va a plantear una funcionalidad mínima para el nodo gestor: simplemente la capacidad para que un único trabajador se dé de alta y para que el maestro pueda obtener la referencia remota a ese trabajador. En el próximo paso desarrollaremos la funcionalidad completa del gestor. A continuación, se especifican los cambios que se deben realizar: En este paso puede volver a ejecutar todas las tareas y verificar que siguen funcionando correctamente pero esta vez se ejecutan en el contexto del trabajador remoto.

Es interesante resaltar que en este paso se puede apreciar uno de los mecanismos que hacen a RMI una herramienta adecuada para este tipo de escenarios: la descarga dinámica y automática del código de las clases. El maestro envía un objeto de una clase que implementa la interfaz Task al trabajador y este puede invocar un método de la misma a pesar de no disponer localmente de su código. Si revisa el script que permite ejecutar un trabajador, en el mismo se puede ver la propiedad codebase que permite especificar de qué dirección, local o remota, se puede descargar el código de las clases no presentes en el nodo local:

java.rmi.server.codebase=file:../../master_node/bin/

Tercer paso: ejecución remota secuencial de un trabajo usando múltiples trabajadores

En este paso completaremos la funcionalidad del gestor que debe permitir: Se va a cambiar también el maestro para que vaya solicitando la ejecución remota de forma rotatoria entre los trabajadores asignados.

Pruebe de nuevo los trabajos pero esta vez con múltiples trabajadores y compruebe cómo se reparte el procesamiento entre ellos, aunque de forma secuencial lo cual no es muy útil.

Cuarto paso: ejecución asíncrona remota de una tarea

La clave para lograr que este sistema sea útil es habilitar la ejecución asíncrona remota de una tarea: el maestro contacta con un trabajador para que inicie la ejecución de una tarea y este le devuelve el control inmediatamente, activando la ejecución de la tarea en el contexto de un thread. Completada la tarea, el trabajador usará un mecanismo de tipo callback (interfaz remota TaskCB) que incluirá un método remoto para notificar al maestro de la finalización de la tarea informándole del resultado, o posible excepción, de la misma.

Este modo de operación asíncrono requiere, por tanto, que en el nodo maestro se implemente la interfaz de tipo callback y que se añada a la petición de ejecución remota una referencia a un objeto de este tipo,

Asimismo, hay que tener en cuenta que este esquema asíncrono requiere poder identificar a qué petición corresponde una respuesta. Para ello, dado que se ha optado por un modelo en el que un trabajador solo puede estar ejecutando una tarea en cada instante, bastaría con enviar la identificación del trabajador como parte de la respuesta. Nótese que a partir de ese identificador el maestro debe deducir a qué dato de entrada corresponde esa respuesta.

La ejecución remota en el contexto de un thread también trae como consecuencia que la excepción que pueda producirse durante la ejecución de la tarea haya que enviarla explícitamente como parte de la respuesta.

Nótese que para este paso nos vamos a conformar con ejecutar un trabajo con una sola tarea y un único trabajador comprobando que somos capaces de recibir de forma asíncrona el resultado, o posible excepción, asociado a la ejecución remota asíncrona de esa tarea. En el siguiente, y último, paso ya se planteará la solución general con múltiples tareas y trabajadores.

A continuación, se especifican algunos de los cambios que se deben realizar en ese paso:

Como se ha comentado previamente, en esta sección nos conformamos con procesar un trabajo con una única tarea por lo que bastaría con incorporar un mecanismo de sincronización que permita a la clase maestra, una vez enviada esa única tarea, esperar por el resultado de la misma.

Quinto paso: versión final

En este paso final, que solo afecta a la clase maestra, ya podemos afrontar toda la funcionalidad del sistema, teniendo que contemplar aspectos tales como: En este paso, alcanzamos finalmente el modo de operación característico de un proceso que ejerce el rol de maestro:

Material de apoyo de la práctica

El material de apoyo de la práctica se encuentra en este enlace.

Al descomprimir el material de apoyo se crea el entorno de desarrollo de la práctica, que reside en el directorio: $HOME/DATSI/SD/MasterWorker.2020/.

Entrega de la práctica

Se realizará en la máquina triqui, usando el mandato:
entrega.sd MasterWorker.2020

Este mandato recogerá los siguientes ficheros: