Introducción a la programación MapReduce en Hadoop

Introducción

El objetivo de esta guía es presentar varios ejemplos sencillos que permitan familiarizarse con los conceptos fundamentales del desarrollo de programas en el entorno MapReduce de Java, concretamente, en la implementación de Hadoop. Esta guía asume que el lector ya conoce los aspectos básicos del modelo MapReduce. En caso contrario, se recomienda que consulte el artículo original que propone este modelo de programación paralela dentro del ámbito de Google (MapReduce: Simplified Data Processing on Large Clusters de Jeffrey Dean y Sanjay Ghemawat), en cuyas ideas se basa la implementación de MapReduce de libre distribución incluida en Hadoop.

Bajo ningún concepto se pretende que esta guía constituya un curso exhaustivo sobre cómo programar en este entorno, estando disponible en Internet numerosa documentación sobre este tema, y menos sobre el despliegue de programas MapReduce sobre un cluster. A continuación, se incluyen tres referencias típicas para profundizar en todos estos aspectos.

En este enlace dispone de los ejemplos usados en esta guía.

Primeros pasos

Lo primero, evidentemente, es instalar el entorno Hadoop, donde está incluido MapReduce. En principio, dado que el interés de esta presentación es sólo la programación, de las tres instalaciones posibles (local, todo ejecuta en una única JVM; en localhost, se despliegan todos los procesos de Hadoop pero sólo en un único nodo; o en un cluster real), optaremos por la instalación local (standalone).

Esta referencia describe cómo instalar este entorno en configuraciones de tipo local o localhost.

Sin embargo, para agilizar el trabajo al lector, se va a partir de un entorno ya instalado en triqui, de manera que éste sólo tenga que definir algunas variables de entorno en su cuenta para poder utilizarlo. Concretamente, debería incluir las siguientes definiciones en el fichero .bashrc (o equivalente) de su cuenta:


export HADOOP_PREFIX=~ssoo/ppd/hadoop-2.5.1
PATH=$PATH:$HADOOP_PREFIX/bin

Pruebe a ejecutar el mandato hadoop y asegúrese de que imprime información sobre su uso.

El MapReduce nulo

Para entender mejor el modo de operación de MapReduce, comenzamos desarrollando un programa (Null.java) que, en principio, no hace nada, dejando, por tanto, que se ejecute un trabajo MapReduce con todos sus parámetros por defecto.
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Null {
	public static void main(String[] args) throws Exception {
		if (args.length != 2) {
      			System.err.println("Uso: null in out");
			System.exit(2);
		}
		// Crea un trabajo MapReduce
		Job job = Job.getInstance(); 
		// Especifica el JAR del mismo
		job.setJarByClass(Null.class);

		// Especifica directorio de entrada y de salida
		FileInputFormat.addInputPath(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// Arranca el trabajo y espera que termine
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Cree un directorio (por ejemplo, input) con varios ficheros de texto y pruebe a compilar y ejecutar este programa especificando como primer parámetro el nombre de ese directorio y como segundo el nombre de un directorio, que no debe existir previamente, donde quedará la salida del trabajo:
javac  -cp `hadoop classpath` *.java  # compila
jar cvf Null.jar *.class # crea el JAR
hadoop jar Null.jar Null input output # nombre del JAR, de la clase principal y args del programa

Eche un vistazo al contenido del directorio de salida, donde, entre otros, habrá un fichero denominado part-r-00000. ¿Qué relación ve entre el contenido de este fichero y los ficheros de texto usados en la prueba? Pronto volveremos con ello pero antes revisemos cuáles son los valores por defecto de un trabajo MapReduce.

Los valores por defecto para un trabajo MapReduce corresponden a haber incluido en el programa las siguientes sentencias:

job.setInputFormatClass(TextInputFormat.class);
Este formato de entrada es de tipo texto y considera cada línea del fichero como un registro invocando, por tanto, la función map del programa por cada línea pasándole como clave el offset dentro del fichero que corresponde al principio de la línea (de tipo LongWritable; Writable es el tipo serializable que usa MapReduce para gestionar todos los datos; en este caso, se corresponde con un long) y como valor el contenido de la línea (de tipo Text, la versión Writable del String).
job.setMapperClass(Mapper.class);
Es un Mapper identidad, que simplemente copia la clave y el valor recibido.
job.setMapOutputKeyClass(LongWritable.class);
El tipo de datos de la clave generada por map. Dado que la función map usada copia la clave recibida, es de tipo LongWritable.
job.setMapOutputValueClass(Text.class);
El tipo de datos del valor generado por map. Dado que la función map usada copia el valor recibido, es de tipo Text.
job.setPartitionerClass(HashPartitioner.class);
Es la clase que realiza la asignación de claves a reducers, usando una función hash para ello.
job.setNumReduceTasks(1);
Sólo usa un Reducer; por eso, hay un único fichero en el directorio de salida.
job.setReducerClass(Reducer.class);
Es un Reducer identidad, que simplemente copia la clave y el valor recibido.
job.setOutputKeyClass(LongWritable.class);
El tipo de datos de la clave generada por reduce y por map excepto si se ha especificado uno distinto para map usando setMapOutputKeyClass. Dado que la función reduce usada copia la clave recibida, es de tipo LongWritable.
job.setOutputValueClass(Text.class);
El tipo de datos del valor generado por reduce y por map excepto si se ha especificado uno distinto para map usando setMapValueKeyClass. Dado que la función reduce usada copia el valor recibido, es de tipo Text.
job.setOutputFormatClass(TextOutputFormat.class);
Este formato de salida es de tipo texto y consiste en la clave y el valor separados, por defecto, por un tabulador (para pasar a texto los valores generados por reduce, el entorno de ejecución invoca el método toString de los respectivas clases Writables).

Se sugiere al lector que modifique el código de Null.java para especificar dos reducers y lo ejecute analizando la salida producida por el programa.

Para terminar esta primera toma de contacto, hay que explicar que el mandato hadoop gestiona sus propios argumentos de línea (veremos un ejemplo en la siguiente sección) y, por tanto, es necesario separar dentro de los argumentos de línea aquellos que corresponden a Hadoop y los que van destinados a la aplicación. La clase Tool facilita este trabajo. A continuación, se presenta la nueva versión de la clase Null.java usando este mecanismo.


import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;

public class Null extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		if (args.length != 2) {
      			System.err.println("Usage: null in out");
			System.exit(2);
		}
                Job job = Job.getInstance(getConf()); // le pasa la config.

		job.setJarByClass(getClass()); // pequeño cambio

		FileInputFormat.addInputPath(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		int resultado = ToolRunner.run(new Null(), args);
		System.exit(resultado);
	}
}

Contando palabras: el hola mundo del MapReduce

El recuento de qué palabras aparecen, y cuántas veces cada una, en una colección de ficheros es el ejemplo por antonomasia en un curso de introducción a la programación MapReduce. A continuación, se muestra el código (ejemplo3) tanto de la clase principal (WC.java) como de las que corresponden con el mapper (WCMapper.java) y el reducer (WCReducer.java) de esta aplicación. (NOTA: este ejemplo se corresponde básicamente con el presentado en la figura 3.1 del libro de Jimmy Lin, con la diferencia de que en ese caso usa un formato de entrada que considera que cada documento de entrada es un registro, es decir, una invocación a la función map, mientras que el formato por defecto de Hadoop usa la línea como registro).
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;

public class WC extends Configured implements Tool {
        public int run(String[] args) throws Exception {
		if (args.length < 2) {
      			System.err.println("Uso: wc in [in...] out");
			System.exit(2);
		}
                Job job = Job.getInstance(getConf());

		job.setJarByClass(getClass());

		job.setMapperClass(WCMapper.class);
		job.setCombinerClass(WCReducer.class); // Uso de un combiner
		job.setReducerClass(WCReducer.class);

		// Mismo tipo de salida para Mapper y Reducer
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		for (int i = 0; i < args.length-1; ++i)
			FileInputFormat.addInputPath(job, new Path(args[i]));
    		FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));
		return job.waitForCompletion(true) ? 0 : 1;
	}
        public static void main(String[] args) throws Exception {
                int resultado = ToolRunner.run(new WC(), args);
                System.exit(resultado);
        }
}

El código de esta clase principal es similar al ejemplo anterior; sólo destacar dos aspectos: el uso de un combiner, implementado por la misma clase que el reducer, y la especificación de varios directorios de entrada.
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.StringTokenizer;

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException {
                StringTokenizer st = new StringTokenizer(valor.toString());
                while (st.hasMoreTokens())
                        contexto.write(new Text(st.nextToken()), new IntWritable(1));
	}
}

Nótese que en la definición de la clase mapper se especifican los tipos de la clave y valor de los datos de entrada, así como los de la clave y valor generados. En cuanto a la función map, recibe la clave y valor de entrada, junto con un parámetro de tipo Context que le proporciona el acceso a la funcionalidad del entorno. En este caso, al método write que usa para emitir los datos de salida hacia los reducers.
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
	public void reduce(Text clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable valor : valores)
			sum += valor.get();
		contexto.write(clave, new IntWritable(sum));
	}
}

Obsérvese que en la definición de la clase reducer se especifican los tipos de la clave y valor de sus datos de entrada, así como los de la clave y valor generados. En cuanto a la función reduce, recibe la clave asignada asi como una colección iterable con los valores asociados a esa clave, junto con un parámetro de tipo Context que le proporciona el acceso a la funcionalidad del entorno. En este caso, al método write que usa para emitir los datos finales.

Contadores y configuración

En esta sección, vamos a extender la funcionalidad del ejemplo anterior de manera que nos permita introducir aspectos adicionales de la programación MapReduce, concretamente, el uso de contadores y la posibilidad de que el usuario pueda especificar información de configuración para la aplicación.

Se plantea añadir las siguientes dos características a la funcionalidad de la aplicación:

Ambas funcionalidades se resolverán con el uso de contadores de usuario y, en el caso de la segunda, además, con la obtención de argumentos de configuración de línea de mandatos.

A continuación, se muestra el código del mapper, que es la única clase que cambia al incluir esta funcionalidad (ejemplo4).


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.StringTokenizer;
 
// tipo de datos para implementar los contadores
enum incidencias {
	LINEAS_VACIAS,
	PALABRAS_CORTAS
}

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	private int longMin;
	// función invocada al principio de la ejecución de este mapper
	public void setup(Context contexto) throws IOException, InterruptedException {
		// obtiene el parámetro de configuración
		longMin = contexto.getConfiguration().getInt("longitud", 1);
	}
	public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException {
		if (valor.getLength() == 0)
			// incrementa el contador de líneas vacías
			contexto.getCounter(incidencias.LINEAS_VACIAS).increment(1);
		else {
                	StringTokenizer st = new StringTokenizer(valor.toString());
                	while (st.hasMoreTokens()) {
				String palabra = st.nextToken();
				if (palabra.length() >= longMin)
                        		contexto.write(new Text(palabra), new IntWritable(1));
				else
				// incrementa el contador de palabras cortas
					contexto.getCounter(incidencias.PALABRAS_CORTAS).increment(1);
			}
		}
	}
	// función invocada al final de la ejecución de este mapper
	public void cleanup(Context contexto) throws IOException, InterruptedException {
		return; // no hace nada; sólo por motivos pedagógicos
	}
}

Conviene resaltar los siguientes aspectos de esta nueva clase:

Para pasar el argumento de configuración, el programa se invocará de la siguiente forma:


hadoop jar WC.jar WC -D longitud=4 input output

Observe el valor de los contadores en los datos que se imprimen cuando se completa el trabajo.

Problemas de recuento

El programa desarrollado en la sección previa pertenece a la categoría de los problemas de recuento, quizás el problema más típico dentro del entorno MapReduce. En esta sección, se van a plantear problemas adicionales que se pueden englobar en esta categoría y que permitirán ir presentando nuevas características de este entorno y seguir practicando con el mismo.

Otro ejemplo de recuento

En esta sección se propone programar un trabajo que, usando la misma fuente de entrada que los programas previos, reciba como argumento una determinada palabra y realice el recuento de qué palabras aparecen justo a continuación de la misma (y en la misma línea) y cuántas veces. Nótese que se trata de un escenario que, aunque muy simplificado, pertenece a la categoría general de problemas de co-ocurrencia de términos.

A continuación, se muestra el código del mapper y del reducer para este ejemplo (ejemplo5).


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.StringTokenizer;
 
public class SigMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	private String termino;
	public void setup(Context contexto) throws IOException, InterruptedException {
		termino = contexto.getConfiguration().get("palabra", "");
	}
	public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException {
		boolean encontrada=false;
               	StringTokenizer st = new StringTokenizer(valor.toString());
               	while (st.hasMoreTokens()) {
			String palabra = st.nextToken();
			if (encontrada) {
                       		contexto.write(new Text(palabra), new IntWritable(1));
				encontrada=false;
			}
			if (palabra.equals(termino))
				encontrada=true;
		}
	}
}

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SigReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
	public void reduce(Text clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable valor : valores)
			sum += valor.get();
		contexto.write(clave, new IntWritable(sum));
	}
}

Uso de claves numéricas

El siguiente ejemplo plantea desarrollar un programa que, usando nuevamente la misma fuente de entrada que la utilizada en el programa que recuenta el número de palabras, obtenga cuántas palabras hay de cada tamaño. Para seguir practicando con la gestión de argumentos y contadores, vamos a añadirle la posibilidad de que el usuario especifique un determinado término de manera que las palabras que comiencen por el mismo no sean tenidas en cuenta en el recuento de tamaños. Además, deberá gestionar un contador indicando cuántas veces se produce esa incidencia.

A continuación, se muestra el código del mapper y del reducer para este ejemplo (ejemplo6). Nótese el uso en ambos módulos de IntWritable para la clave, al ser ésta numérica.


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.StringTokenizer;
 
enum incidencias {
	PALABRAS_VETADAS
}

public class WLMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
	private String vetada;
	public void setup(Context contexto) throws IOException, InterruptedException {
		vetada = contexto.getConfiguration().get("vetada");
	}
	public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException {
                StringTokenizer st = new StringTokenizer(valor.toString());
                while (st.hasMoreTokens()) {
			String palabra = st.nextToken();
			if ((vetada != null) && palabra.startsWith(vetada))
				contexto.getCounter(incidencias.PALABRAS_VETADAS).increment(1);
			else
                     		contexto.write(new IntWritable(palabra.length()), new IntWritable(1));
		}
	}
}

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WLReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
	public void reduce(IntWritable clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable valor : valores)
			sum += valor.get();
		contexto.write(clave, new IntWritable(sum));
	}
}

Formato de entrada KeyValueTextInputFormat

En esta sección se presenta un segundo formato de entrada de modo texto: el formato de entrada KeyValueTextInputFormat, que espera una entrada de texto orientada a líneas tal que la clave y el valor estén separados, por defecto, por un tabulador, proporcionando a la función map datos de tipo Text tanto para la clave como para el valor. Obsérvese que el formato de entrada KeyValueTextInputFormat es el idóneo para interpretar datos que se han generado mediante el formato de salida TextOutputFormat.

Para ilustrar este neuvo formato, supongamos que durante varios meses hemos estado ejecutando la aplicación WC sobre diversos contenidos de entrada y disponemos de los resultados de cada ejecución en un directorio independiente. Nos planteamos realizar una nueva aplicación que reciba como fuente de datos de entrada los distintos directorios de resultados y que lleve a cabo el recuento total de palabras.

A continuación, se muestra el código del mapper y del reducer (la clase principal es similar a la de ejemplos previos, únicamente especificando el nuevo formato de entrada usando job.setInputFormatClass(KeyValueTextInputFormat.class)) para este ejemplo (ejemplo7). Nótese el uso del formato de entrada KeyValueTextInputFormat y cómo esto afecta a los parámetros de mappers y reducers.


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.StringTokenizer;
 

public class WCTotMapper extends Mapper<Text, Text, Text, IntWritable>{
	public void map(Text clave, Text valor, Context contexto) throws IOException, InterruptedException {
                contexto.write(clave,
			new IntWritable(Integer.parseInt(valor.toString())));
	}
}

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCTotReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
	public void reduce(Text clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable valor : valores)
			sum += valor.get();
		contexto.write(clave, new IntWritable(sum));
	}
}

Combiners y operaciones no asociativas

En los ejemplos vistos hasta el momento, en la clase principal se activa un combiner especificando la clase reducer para que actúe en ese rol, optimizando, generalmente, la ejecución del trabajo al poder realizar agragaciones parciales antes de la ejecución de los reducers. Sin embargo, no todas las operaciones permiten estas agregaciones parciales: si la operación de agregación no es asociativa (como, por ejemplo, el cálculo de la media), el uso de un combiner dará lugar a resultados incorrectos. Este apartado plantea un ejemplo de esa índole (véase la sección Algorithmic Correctness with Local Aggregation del libro de Jimmy Lin).

Se plantea crear una versión modificada del programa anterior que calcule la media asociada a cada palabra, es decir, el promedio entre el número total de veces que aparece y el número de recuentos en los que está presente, redondeada a entero.

A continuación, se muestra sólo el código del reducer para este ejemplo (ejemplo8) (NOTA: este ejemplo se corresponde básicamente con el presentado en la figura 3.4 del libro de Jimmy Lin).


import java.io.IOException;

emport org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCMedReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
	public void reduce(Text clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException {
		int sum = 0;
		int n = 0;
		for (IntWritable valor : valores) {
			sum += valor.get();
			++n;
		}
		int media = (sum + n - 1)/n; // truco para que redondee
		contexto.write(clave, new IntWritable(media));
	}
}

Nótese que en la clase principal se ha dejado comentada la sentencia que activa el uso de combiners puesto que, dado el carácter no asociativo de la operación media, el resultado con combiners sería incorrecto.

Para poder comprobar el problema, vamos a suponer que en la entrada de datos de este trabajo puede aparecer en una partición asignada a un mapper el resultado de varios recuentos, para, de esta manera, asegurarnos de que aparece el problema al hacer que un mismo mapper se pueda encontrar con varias apariciones del mismo término (lo que forzaría que el combiner hiciera la media de más de un elemento).

Se sugiere, por tanto, que el lector concatene en un mismo fichero de entrada los resultados de varios recuentos (realmente, juntar los datos de entrada en un mismo fichero, es una técnica recomendable en este entorno puesto que el uso de ficheros de tamaño pequeño no es eficiente, al lanzarse un mapper por cada uno) y active el uso de combiners.

¿Tenemos que renunciar a la optimización que proporciona el uso de combiners para este ejemplo? La respuesta es afortunadamente negativa, tal como se analiza en el siguiente apartado.

Combiners y reducers diferentes

Como se explica en la sección Algorithmic Correctness with Local Aggregation del libro de Jimmy Lin, hay soluciones a este problema. En la figura 3.6 de este libro, se muestra el pseudo-código que permite usar combiners para resolver este problema. Del mismo, destacamos las siguientes características: Entramos ya con la solución (ejemplo9) que, como se comentó previamente, se corresponde básicamente con el presentado en la figura 3.4 del libro de Jimmy Lin. En la misma, hay una clase diferente para el combiner y el reducer al no tener estos la misma funcionalidad (el reducer calcula la media y el combiner no). Hemos aprovechado esta separación para hacer un pequeño cambio en el enunciado: la media se calculará como un número en coma flotante, lo que implica que el tipo del valor de salida del reducer sea de tipo DoubleWritable.

En primer lugar, será necesario definir un nuevo tipo Writable para almacenar la pareja de enteros (IntPair.java). Aunque en esta guía no vamos a explicar en profundidad cómo definir un nuevo Writable (en Internet hay mucha información sobre este tema y, lo que es mejor, hay un montón de Writables ya programados para casi todo lo que se nos pueda ocurrir), sí se considera conveniente resaltar algunos aspectos sobre esta labor basándonos en la clase que se presenta a continuación. Además de los típicos constructores y funciones de tipo get, nos encontramos con las funciones readFields y write que llevan a cabo, respectivamente, la deserialización y serialización de los datos, basándose meramente en invocar esas mismas funciones sobre los campos que componen la clase. Asimismo, si queremos que ese tipo de datos pueda ser almacenado en un fichero de texto, es necesario definir el método toString.


import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Writable; 
 
public class IntPair implements Writable { 
	 
	private IntWritable first; 
	private IntWritable second; 
	 
	public IntWritable getFirst() { 
		return first; 
	} 
	public IntWritable getSecond() { 
		return second; 
	} 
	 
	public IntPair(){ 
		first = new IntWritable(); 
		second  = new IntWritable(); 
	} 

	public IntPair(int first, int second){ 
		this(new IntWritable(first), new IntWritable(second)); 
	} 
	 
	public IntPair(IntWritable first, IntWritable second){ 
		this.first = first; 
		this.second = second; 
	} 
	 
	public void readFields(DataInput in) throws IOException { 
		first.readFields(in); 
		second.readFields(in); 
	} 
	public void write(DataOutput out) throws IOException { 
		first.write(out); 
		second.write(out); 
		 
	} 
	public String toString() { 
		return first.toString() + "\t" + second.toString(); 
	} 
} 

En cuanto a la clase principal, los únicos aspectos relevantes son el uso de clases separadas para el combiner y el reducer, así como la especificación de un tipo distinto para el valor de salida del mapper y del reducer.
		job.setMapperClass(WCMedMapper.class);
		job.setCombinerClass(WCMedCombiner.class);
		job.setReducerClass(WCMedReducer.class);
		job.setMapOutputValueClass(IntPair.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleWritable.class);

A continuación, se muestra el código de la clase mapper, combiner y reducer para este ejemplo.
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.StringTokenizer;
 
public class WCMedMapper extends Mapper<Text, Text, Text, IntPair>{
	public void map(Text clave, Text valor, Context contexto) throws IOException, InterruptedException {
		int val = Integer.parseInt(valor.toString());
                contexto.write(clave,
			new IntPair(val, 1));
	}
}

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCMedCombiner extends Reducer<Text,IntPair,Text,IntPair> {
	public void reduce(Text clave, Iterable<IntPair> valores, Context contexto) throws IOException, InterruptedException {
		int sum = 0;
		int n = 0;
		for (IntPair valor : valores) {
			sum += valor.getFirst().get();
			n += valor.getSecond().get();
		}
		contexto.write(clave, new IntPair(sum,n));
	}
}

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCMedReducer extends Reducer<Text,IntPair,Text,DoubleWritable> {
	public void reduce(Text clave, Iterable<IntPair> valores, Context contexto) throws IOException, InterruptedException {
		int sum = 0;
		int n = 0;
		for (IntPair valor : valores) {
			sum += valor.getFirst().get();
			n += valor.getSecond().get();
		}
		double media = (double)sum/n;
		contexto.write(clave, new DoubleWritable(media));
	}
}

Como se comentó previamente, en vez de realizar una clase combiner y una reducer totalmente independientes, es más razonable que compartan su funcionalidad común. En este subdirectorio, se muestra una versión que usa una clase auxiliar que incluye la funcionalidad requerida por la clase combiner y una reducer y sirve de soporte para ambas.

Mappers y Reducers con estado

El modelo MapReduce plantea una esquema sin estado: cada invocación de la función map/reduce recibe una pareja clave-valor y genera, a su vez, parejas clave-valor. Ese esquema es parte de la idiosincrasia de MapReduce logrando gran escalabilidad, al no requerir almacenar en memoria ninguna información de estado.

Sin embargo, si lo considera oportuno para facilitar el procesamiento, el programador puede incluir estado tanto en un mapper como en un reducer, pero siempre valorando si el tamaño que ocupa esa información de estado en memoria puede causar problemas de escalabilidad en la aplicación.

A continuación, se muestra una versión del mapper (ejemplo10) del programa que cuenta palabras que usa estado para combinar los resultados (en el libro de Jimmy Lin a esta técnica se la denomina in-mapper combining y corresponde al programa de la figura 3.3).


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.StringTokenizer;
import java.util.HashMap;
import java.util.Map;
 
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	private HashMap<String, Integer> tabla;

	public void setup(Context contexto) throws IOException, InterruptedException {
		tabla = new HashMap<String, Integer>();
	}
	public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException {
               	StringTokenizer st = new StringTokenizer(valor.toString());
               	while (st.hasMoreTokens()) {
			String palabra = st.nextToken();
			Integer c = tabla.get(palabra);
        		if (c==null)
				c = new Integer(1); 
			else
				c = new Integer(++c); 
                	tabla.put(new String(palabra), c);
		}
	}
	public void cleanup(Context contexto) throws IOException, InterruptedException {
		for(Map.Entry<String, Integer> palabra : tabla.entrySet())
                        contexto.write(new Text(palabra.getKey()),
				new IntWritable(palabra.getValue()));

		return;
	}
}

Como se comentó previamente, este esquema presenta problemas de escalabilidad y es preferible, por tanto, usar combiners. Sin embargo, hay que tener en cuenta que Hadoop no garantiza nada sobre la ejecución de combiners: podrían ejecutarse múltiples veces o ninguna.

Proyección, transformación y filtrado: Mappers sin Reducers

No siempre es necesario usar reducers. Aquellas aplicaciones que no requieran realizar ningún tipo de agregación sobre los datos originales sino llevar a cabo operaciones de transformación o filtrado de los mismos sólo necesitan usar mappers:
job.setNumReduceTasks(0);

A continuación, se muestra una aplicación que pasa a mayúsculas todo el contenido del repositorio de datos de entrada (ejemplo11). En primer lugar, se muestra el código de la clase principal y, acto seguido, el del mapper de esta aplicación.
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;

public class Mayus extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		if (args.length != 2) {
      			System.err.println("Usage: mayus in out");
			System.exit(2);
		}
		Job job = Job.getInstance(getConf());
		job.setJarByClass(getClass());

		job.setMapperClass(MayusMapper.class);

		job.setNumReduceTasks(0);

		FileInputFormat.addInputPath(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		return job.waitForCompletion(true) ? 0 : 1;
	}
	public static void main(String[] args) throws Exception {
		int resultado = ToolRunner.run(new Mayus(), args);
		System.exit(resultado);
	}
}

Observe que el número de reducers se ha fijado como cero.
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MayusMapper extends Mapper<LongWritable, Text, NullWritable, Text>{
	public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException {
 		String linea = valor.toString().toUpperCase(); 
        	contexto.write(NullWritable.get(), new Text(linea));
	}
}

Nótese en el código del mapper el uso de la clase NullWritable para generar una clave vacía (queremos mantener el contenido original del fichero pero transformado a mayúsculas; si escribieramos la clave recibida, en los datos de salida aparecería el offset). Obsérvese también como en el directorio de salida hay un fichero por cada mapper.

A continuación, se plantea un problema de las mismas características (ejemplo12) que realiza el filtrado de los datos de entrada dejando en la salida sólo aquellas líneas que incluyan un determinado valor que se recibe como argumento (similar al grep de UNIX). A continuación, se muestra el código del mapper de este ejemplo.


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class GrepMapper extends Mapper<LongWritable, Text, NullWritable, Text>{
	private String cadena;
        public void setup(Context contexto) throws IOException, InterruptedException {
                cadena = contexto.getConfiguration().get("cadena");
        }

	public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException {
		if (valor.toString().contains(cadena))
        		contexto.write(NullWritable.get(), valor);
	}
}

Encadenando trabajos

Frecuentemente, una aplicación requiere realizar varios trabajos MapReduce encadenados entre sí, de manera que la salida de uno sea la entrada del siguiente. Para lograr este encadenamiento, basta con configurar y activar cada trabajo cuando termina el previo especificando como entrada del mismo el directorio de salida del anterior.

A continuación, se plantea como ejemplo una aplicación que requiere la ejecución encadenada de dos trabajos:

  1. Un primer trabajo, que corresponde con el planteado en un ejercicio previo, realiza el recuento de qué palabras aparecen a continuación de una dada. A este trabajo, le hemos añadido que gestione un contador que acumule cuántas veces aparece la palabra dada.
  2. El segundo trabajo normalizará los datos generados por el primero de manera que, en vez de expresar los resultados como número de apariciones absolutas, lo hará como porcentajes con respecto al número total de ocurrencias del término dado (algo como, por ejemplo, el 30% de las veces, después de la palabra casa, aparece el término rural). Nótese que se trata de un trabajo que no requiere reducers.
Para lograr encadenar estos trabajos, la clase principal, cuando se complete el primer trabajo, leerá el contador que refleja el número total de apariciones de la palabra dada y configurará el segundo trabajo especificando: A continuación, se muestra la clase principal (Jobs.java) de esta aplicación que activa en cadena los dos trabajos comentados (ejemplo13).
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;

public class Jobs extends Configured implements Tool {
        public int run(String[] args) throws Exception {
		if (args.length < 2) {
      			System.err.println("Uso: jobs in [in...] out");
			System.exit(2);
		}
		
		Job job = Job.getInstance(getConf());

		job.setJarByClass(getClass());

		job.setMapperClass(SigMapper.class);
		job.setCombinerClass(SigReducer.class);
		job.setReducerClass(SigReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		for (int i = 0; i < args.length-1; ++i)
			FileInputFormat.addInputPath(job, new Path(args[i]));

		// se usa un directorio intermedio para que el primer
		// trabajo deje sus datos al segundo
		Path tmp = new Path(args[args.length-1] + "-tmp");
    		FileOutputFormat.setOutputPath(job, tmp);

		if (!job.waitForCompletion(true))
			return 1;

		long total = job.getCounters().findCounter(contador.NUM_PALABRAS).getValue();
		job = Job.getInstance(getConf());


		job.setJarByClass(getClass());

		job.setInputFormatClass(KeyValueTextInputFormat.class);
		job.setMapperClass(NormMapper.class);

		job.setNumReduceTasks(0);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleWritable.class);

		FileInputFormat.addInputPath(job, tmp);
    		FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));

		job.getConfiguration().setLong("total", total);

		return job.waitForCompletion(true) ? 0 : 1;
	}
        public static void main(String[] args) throws Exception {
                int resultado = ToolRunner.run(new Jobs(), args);
                System.exit(resultado);
        }
}
Observe que el primer trabajo usa tanto mapper como reducer (SigMapper.java y SigReducer.java, respectivamente), mientras que el segundo, que realiza la normalización, sólo requiere mapper (NormMapper.java). Nótese, asimismo, el uso de un directorio temporal para pasar los datos del primer trabajo al segundo. Por razones pedagógicas, no se ha borrado el directorio temporal.

Co-ocurrencia de términos: claves y valores compuestos

En esta sección analizamos el problema de la determinación de la co-ocurrencia de términos en un cierto conjunto de documentos, es decir, calcular, por cada palabra que aparece en el repositorio de entrada, qué otras palabras (y cuántas veces) aparecen simultáneamente con dicha palabra en un determinado ámbito (en el mismo párrafo, en la misma frase, en una ventana de n palabras de distancia, etc.).

La resolución de este problema en MapReduce presenta ciertas características interesantes que pueden aplicarse a otro tipo de problemas.

Para simplificar el problema, tal como hicimos en un ejercicio previo, por cada palabra del repositorio de entrada, sólo vamos a considerar como co-ocurrente aquélla que aparece justo a continuación de la misma.

Se trata, por tanto, de una generalización del ejercicio previo que realizaba el recuento de qué palabras aparecen a continuación de una dada: en este caso, lo haremos para todas las palabras. Además, calcularemos el total por cada palabra clave.

Tomando como partida la solución de ese ejercicio anterior, la principal diferencia es que en este caso el mapper sí que tiene que emitir la palabra que está analizando en ese momento (en el ejemplo previo, no era necesario puesto que era una palabra especificada por el usuario). Por tanto, debería generar tres datos: la palabra que está analizando, la siguiente y un contador a 1 para realizar el acumulado. ¿Cómo vamos a organizar esos tres datos?

Dependiendo de ello, vamos a plantear tres soluciones:

  1. Una primera que envía la palabra que se está analizando como clave y la pareja formada por la siguiente palabra y el contador como valor. Será necesario definir un nuevo tipo Writable para almacenar la pareja (palabra, contador). Como se verá en el siguiente apartado, esta solución requiere estado en el reducer.
  2. Una segunda que envía la pareja formada por las dos palabras como clave y el contador como valor. Habrá que definir, en este caso, un nuevo tipo WritableComparable (se requiere que sea también Comparable ya que va a actuar como clave) para almacenar el par con las dos palabras. En este caso, la solución no requiere estado. Esta estrategia se corresponde básicamente con el pseudo-código presentado en la figura 3.8 del libro de Jimmy Lin, a la que denomina solución basada en pairs.
  3. La tercera estrategia intenta reducir la información que viaja entre mappers y reducers haciendo que sólo se transfiera una vez la palabra que actúa de clave enviando con ella una estructura compleja que guarda qué palabras aparecen justo a continuación y cuántas veces. Para este caso, usaremos una de las colecciones de tipo Writable que proporciona Hadoop: MapWritable. La estrategia usada en esta solución se corresponde básicamente con el pseudo-código presentado en la figura 3.9 del libro de Jimmy Lin, a la que denomina solución basada en stripes (en el sentido de que se está enviando la fila de una hipotética matriz que almacenara en cada componente (i,j) cuántas veces aparece la palabra j justo a continuación de la palabra i.
Un último aspecto a destacar es que este problema, tal como se ha planteado, requiere que el combiner y el reducer sean diferentes puesto que éste último tiene también que calcular, y generar, el total acumulado por cada palabra clave.

Co-ocurrencia: solución con estado en el reducer

Esta solución (ejemplo14) define un nuevo tipo Writable que permite almacenar la pareja formada por la palabra y el contador (TextIntPair.java).

La clase principal (Sig.java) es igual que la del ejercicio previo pero especificando como valor de salida del mapper y del reducer un tipo TextIntPair en vez de un IntWritable.

A continuación, se muestra el mapper (SigMapper.java) que es similar al del ejercicio previo pero extendido a todas las palabras y usando un TextIntPair como valor.


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.StringTokenizer;
 
public class SigMapper extends Mapper<LongWritable, Text, Text, TextIntPair>{
	public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException {
               	StringTokenizer st = new StringTokenizer(valor.toString());
		if (st.hasMoreTokens()) {
			String previa = st.nextToken();
               		while (st.hasMoreTokens()) {
				String palabra = st.nextToken();
                       		contexto.write(new Text(previa),
					new TextIntPair(palabra, 1));
				previa = palabra;
			}
		}
	}
}

El reducer, sin embargo, sí que presenta importantes diferencias. Téngase en cuenta que la función reduce será invocada por cada palabra a analizar recibiendo como valor la lista iterable de qué palabras aparecen justo a continuación de la misma con su contador de apariciones (nótese que si no se aplican combiners este contador será 1 pero puede ser mayor en caso contrario). El problema es que esa lista de valores puede aparecer en cualquier orden (el MapReduce de Hadoop, a diferencia del de Google, no permite solicitar que se ordenen los valores asociados a una clave). Por tanto, para poder hacer el acumulado, hay que usar alguna estructura de datos que permita acumular los contadores mientras se itera sobre los valores.

En cuanto al cálculo del total para cada palabra, requerido por el reducer pero que no debe estar presente en el combiner, es directo: basta con acumular el recuento de cada palabra que aparece a continuación dentro de la misma llamada reduce.

Dada que la funcionalidad del combiner y del reducer sólo se diferencia en el recuento del total por cada palabra clave que debe realizar el reducer, en este caso se ha optado por crear una clase base común (SigRedCom.java) que implemente la función reduce adecuada para ambos módulos, distinguiéndose por el valor especificado en su constructor. A continuación, se muestra esa clase común. Observe el uso de una estructura HashMap en cada invocación de reduce.


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.util.HashMap;
import java.util.Map;

public class SigRedCom extends Reducer<Text,TextIntPair,Text,TextIntPair> {
	private boolean isReducer=true;
	public SigRedCom(boolean reducer) {
		isReducer=reducer;
	}
	public void reduce(Text clave, Iterable<TextIntPair> valores, Context contexto) throws IOException, InterruptedException {
                HashMap<String, Integer> tabla = new HashMap<String, Integer>();
		int total=0;
		for (TextIntPair valor : valores) {
			String palabra = valor.getFirst().toString();
			int cuenta = valor.getSecond().get();
			Integer c = tabla.get(palabra);
                        if (c==null)
                                c = new Integer(cuenta);
                        else
                                c = new Integer(c + cuenta);
                        tabla.put(new String(palabra), c);
			if (isReducer) total+=cuenta;
		}
                for(Map.Entry<String, Integer> palabra : tabla.entrySet())
                        contexto.write(clave, new TextIntPair(palabra.getKey(),
                                palabra.getValue()));
		if (isReducer)
			contexto.write(clave, new TextIntPair("", total));
	}
}

Las clases SigReducer.java y SigCombiner.java simplemente se derivan de la clase base especificando en la llamada al constructor de la misma el tipo de comportamiento requerido.

Co-ocurrencia: solución sin estado

La solución previa requiere estado en el reducer debido a que el MapReduce no ordena los valores asociados a la misma clave. Para resolver esta limitación, un esquema habitual es lo que algunos denominan secondary sort, que consiste en mover el dato correspondiente (en esta caso, la palabra que viene justo a continuación en el texto de entrada) del campo valor al campo clave. Esta solución (ejemplo15) requiere, por tanto, un nuevo WritableComparable que incluya ambos campos y que defina una función de ordenación tal que primero tenga en cuenta la primera palabra a la hora de ordenar usando sólo la segunda en caso de que la primera sea igual. A continuación, se muestra el código del tipo WritableComparable que permite almacenar la pareja formada por las dos palabras usando una función de comparación que siga el esquema que se acaba de explicar (TextPair.java, extraído del libro de Tom White; nótese que, por simplicidad, no se ha incluido un raw comparator para hacer más eficiente la comparación). Nótese que, además de los métodos requeridos por un Writeble, es necesario especificar métodos que permitan ordenar las claves (compareTo y equals) y realizar la partición entre reducers (hashCode).
import java.io.*;
import org.apache.hadoop.io.*;

public class TextPair implements WritableComparable<TextPair> {
	private Text first;
	private Text second;
	public TextPair() {
		set(new Text(), new Text());
	}
	public TextPair(String first, String second) {
		set(new Text(first), new Text(second));
	}
	public TextPair(Text first, Text second) {
		set(first, second);
	}
	public void set(Text first, Text second) {
		this.first = first;
		this.second = second;
	}
	public Text getFirst() {
		return first;
	}
	public Text getSecond() {
		return second;
	}
	public void write(DataOutput out) throws IOException {
		first.write(out);
		second.write(out);
	}
	public void readFields(DataInput in) throws IOException {
		first.readFields(in);
		second.readFields(in);
	}
	public int hashCode() {
		return first.hashCode() * 163 + second.hashCode();
	}
	public boolean equals(Object o) {
		if (o instanceof TextPair) {
			TextPair tp = (TextPair) o;
			return first.equals(tp.first) && second.equals(tp.second);
		}
		return false;
	}
	public String toString() {
		return first + "\t" + second;
	}
	public int compareTo(TextPair tp) {
		int cmp = first.compareTo(tp.first);
		if (cmp != 0) {
			return cmp;
		}
		return second.compareTo(tp.second);
	}
}

A continuación, se muestra el mapper (SigMapper.java) que es similar al de la solución previa pero incluyendo la palabra que aparece a continuación dentro de la clave en vez de incluirla en el valor.
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.StringTokenizer;
 
public class SigMapper extends Mapper<LongWritable, Text, TextPair, IntWritable>{
	public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException {
               	StringTokenizer st = new StringTokenizer(valor.toString());
		if (st.hasMoreTokens()) {
			String previa = st.nextToken();
               		while (st.hasMoreTokens()) {
				String palabra = st.nextToken();
                       		contexto.write(new TextPair(previa, palabra),
					new IntWritable(1));
				previa = palabra;
			}
		}
	}
}

Con esta solución, la parte del reducer que hace el recuento de cada palabra que aparece a continuación se simplifica con respecto a la primera solución, siendo igual que la del ejercicio que realiza el recuento para una palabra dada, cambiando únicamente el tipo usado para la clave. Este recuento es común tanto para el reducer como el combiner por lo que, en este caso, se ha optado por usar una clase auxiliar SigAcumulador.java, que realiza esta parte común.
import org.apache.hadoop.io.IntWritable;
public class SigAcumulador {
	public static int suma(Iterable<IntWritable> valores) {
                int sum = 0;
                for (IntWritable valor : valores)
                        sum += valor.get();
		return sum;
	}
}

La clase combiner (clase SigCombiner.java) usa directamente el método suma ofrecida por esta clase auxiliar.
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SigCombiner extends Reducer<TextPair,IntWritable,TextPair,IntWritable> {

	public void reduce(TextPair clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException {
                contexto.write(clave, new IntWritable(SigAcumulador.suma(valores)));
        }
}

Sin embargo, la parte que realiza la cuenta total de palabras, que sólo se requiere en el reducer, se complica puesto que en la primera solución se realiza en el ámbito de una única llamada reduce, mientras que en esta segunda solución se extiende a la secuencia sucesiva de llamadas reduce correspondientes a la misma palabra. Nótese que en el fichero de salida en formato texto la clave se imprime tal como indica el método toString de la clase TextPair: las dos palabras separadas por un tabulador. A continuación, se muestra ese reducer (SigReducer.java). Obsérvese que se usa una variable para guardar el acumulado entre llamadas a reduce, lo que podría considerarse un esquema con estado en el reducer. Sin embargo, generalmente, no se considera como tal, al no tratarse de información cuyo tamaño depende del volumen de los datos tratados.
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SigReducer extends Reducer<TextPair,IntWritable,TextPair,IntWritable> {
	private int total = 0;
	private String palabraPrevia = "";

	public void reduce(TextPair clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException {
		String palabra = clave.getFirst().toString();
		if ((!palabra.equals(palabraPrevia)) &&
		     (palabraPrevia.length()>0)){
                	contexto.write(new TextPair(palabraPrevia,""),
				new IntWritable(total));
			total=0;
		}
		int sum =  new IntWritable(SigAcumulador.suma(valores)).get();
                contexto.write(clave, new IntWritable(sum));
		total+=sum;
		palabraPrevia=palabra;
        }
	 public void cleanup(Context contexto) throws IOException, InterruptedException {
		if (palabraPrevia.length()>0)
                	contexto.write(new TextPair(palabraPrevia,""),
				new IntWritable(total));
	
        }
}

Sin embargo, hay un aspecto adicional que se tiene que tener en cuenta. Dado que la función de partición por defecto usa la clave para decidir a qué reducer asignar cada dato generado por un mapper, los datos asociados a una determinada palabra serán asignados a distintos reducers, al tener distinta clave, no llevándose correctamente el cálculo del total.

Para solucionar este problema, es necesario definir una nueva clase Partitioner (denominada SigPartitioner) que realice la asignación a un reducer teniendo en cuenta únicamente la primera parte de la clave, y especificarla en la configuración:

job.setPartitionerClass(SigPartitioner.class);
A continuación, se muestra la clase principal de la aplicación (Sig.java).
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;


public class Sig extends Configured implements Tool {

	public static class SigPartitioner extends Partitioner<TextPair, IntWritable> {
		public int getPartition(TextPair clave, IntWritable valor, int nReducers) {
			return (clave.getFirst().hashCode() & Integer.MAX_VALUE) % nReducers;
		}
	}

        public int run(String[] args) throws Exception {
		if (args.length < 2) {
      			System.err.println("Uso: sig in [in...] out");
			System.exit(2);
		}
		
		Job job = Job.getInstance(getConf());

		job.setJarByClass(getClass());

		job.setMapperClass(SigMapper.class);
		job.setCombinerClass(SigCombiner.class);
		job.setReducerClass(SigReducer.class);

		job.setOutputKeyClass(TextPair.class);
		job.setOutputValueClass(IntWritable.class);

		job.setPartitionerClass(SigPartitioner.class);

		for (int i = 0; i < args.length-1; ++i)
			FileInputFormat.addInputPath(job, new Path(args[i]));
    		FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));
		return job.waitForCompletion(true) ? 0 : 1;
	}
        public static void main(String[] args) throws Exception {
                int resultado = ToolRunner.run(new Sig(), args);
                System.exit(resultado);
        }
}

Co-ocurrencia: solución con estado en mappers y reducers

En esta solución, como ocurría en la primera, el mapper envía como clave al reducer la palabra correspondiente, pero, a diferencia de dicha solución, envía como valor un stripe (es decir, una colección que incluye qué palabras aparecen justo a continuación de la palabra clave y cuántas veces). Se requiere, por tanto, un tipo de datos colección para almacenar este valor. Hadoop dispone de varias colecciones Writables y para esta solución vamos a usar MapWritable, una implementación del tipo genérico Map de Java donde tanto la clave como el valor (nos referimos a la clave y valor del mapa, no a los generados por mappers y reducers) son Writables.

Esta solución (ejemplo16) no requeriría, en principio, definir un nuevo Writable. Sin embargo, dado que el tipo MapWritable no proporciona un método toString específico y este método es necesario si queremos que los datos resultantes queden almacenados en ficheros de texto, hemos creado una clase derivada (a la que hemos llamado MapPrintable) simplemente para añadirle ese método. Téngase en cuenta que, por razones didácticas, no se ha pretendido en el diseño de este método que la salida final resultante en modo texto tenga el mismo formato que en las dos soluciones previas (en esta solución, todas las estadísticas relacionadas con una palabra clave aparecen en la misma línea, mientras que en las soluciones anteriores estaban en líneas diferentes; se podría modificar el método para conseguir el mismo formato pero habría que complicarlo un poco: ejemplo16 con mismo formato de salida).


import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import java.util.Map;

public class MapPrintable extends MapWritable {
        public String toString() {
                StringBuilder cadena = new StringBuilder();
                for(Map.Entry<Writable, Writable> entrada : this.entrySet())
                                cadena.append(entrada.getKey().toString()
                                    + " " + entrada.getValue().toString() + " ");
                return cadena.toString();
        }
}

El mapper (SigMapper.java) de esta solución es similar, como se comentó previamente, al incluido en el pseudo-código presentado en la figura 3.9 del libro de Jimmy Lin. En la solución que presentamos, a diferencia de la presentada en el libro, se usa la técnica del in-mapper combining, de manera que sólo se genera la salida del mapper cuando este completa su labor, dentro de la función cleanup.

Para ir almacenando la relación entre palabras clave y su stripe correspondiente, se va a usar un HashMap de Java que guarde esta asociación (HashMap<String, MapPrintable>). A continuación, se muestra el código de este >mapper.


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.StringTokenizer;
import java.util.HashMap;
import java.util.Map;

public class SigMapper extends Mapper<LongWritable, Text, Text, MapPrintable>{
        private HashMap<String, MapPrintable> tabla;

        public void setup(Context contexto) throws IOException, InterruptedException {
                tabla = new HashMap<String, MapPrintable>();
        }

	public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException {
               	StringTokenizer st = new StringTokenizer(valor.toString());
		if (st.hasMoreTokens()) {
			String previa = st.nextToken();
               		while (st.hasMoreTokens()) {
				String palabra = st.nextToken();
				MapPrintable stripe = tabla.get(previa);
				// primera aparición de esa palabra como clave
                        	if (stripe==null) {
					stripe = new MapPrintable();
                        		tabla.put(new String(previa), stripe);
					stripe.put(new Text(palabra), new IntWritable(1));
				}
                        	else {
					IntWritable c = (IntWritable) stripe.get(new Text(palabra));
					// 1ª aparición de esa palabra como siguiente
		                       	if (c==null) 
                                		c = new IntWritable(1);
                        		else 
                                		c = new IntWritable(c.get()+1);
                        		stripe.put(new Text(palabra), c);
				}
				previa = palabra;
			}
		}
	}
        public void cleanup(Context contexto) throws IOException, InterruptedException {
                for(Map.Entry<String, MapPrintable> stripe : tabla.entrySet())
                        contexto.write(new Text(stripe.getKey()), stripe.getValue());

                return;
        }

}

En cuanto al reducer, su labor básicamente es mezclar los stripes que recibe por cada palabra clave y generar como valor el stripe resultante de esa mezcla.

Como se planteó en la primera solución, dada que la funcionalidad del combiner y del reducer sólo se diferencia en el recuento del total por cada palabra clave que debe realizar el reducer, se ha optado también por crear una clase base común (SigRedCom.java) que implemente la función reduce adecuada para ambos módulos, distinguiéndose por el valor especificado en su constructor. A continuación, se muestra esa clase común.


import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.util.HashMap;
import java.util.Map;

public class SigRedCom extends Reducer<Text,MapPrintable,Text,MapPrintable> {
	private boolean isReducer=true;
	public SigRedCom(boolean reducer) {
		isReducer=reducer;
	}
	public void reduce(Text clave, Iterable<MapPrintable> valores, Context contexto) throws IOException, InterruptedException {
                MapPrintable stripeMix = new MapPrintable();
		int total=0;
		for (MapPrintable stripe : valores) {
               		for(Map.Entry<Writable, Writable> entrada : stripe.entrySet()) {
				Text palabra = (Text) entrada.getKey();
				int cuenta = ((IntWritable) entrada.getValue()).get();
				IntWritable c = (IntWritable)stripeMix.get(palabra);
				if (c == null) 
					stripeMix.put(new Text(palabra),
						new IntWritable(cuenta));
				else
					stripeMix.put(new Text(palabra),
						new IntWritable(c.get() + cuenta));
					
				if (isReducer) total+=cuenta;
			}

		}
		if (isReducer)
			stripeMix.put(new Text(""), new IntWritable(total));

                contexto.write(clave, stripeMix);
	}
}

Nótese que, para incluir el total dentro del stripe resultante de la mezcla (operación que sólo realiza el reducer), se ha optado por añadir una entrada con una palabra vacía.

Procesamiento de grafos (soluciones iterativas)

Uno de los problemas típicos que se presentan dentro del ámbito del MapReduce es el procesamiento de grafos. De hecho, el libro de Jimmy Lin le dedica un capítulo completo a este tema. A continuación, se resaltan cuáles son los características habituales de los trabajos MapReduce que procesan este tipo de estructuras de datos:

Distancias mínimas desde un nodo origen

En esta sección, vamos a estudiar un ejemplo concreto (ejemplo17): la búsqueda de las distancias mínimas de un nodo origen a todos los nodos del grafo. La estrategia usada en esta solución se corresponde básicamente con el pseudo-código presentado en la figura 5.4 del libro de Jimmy Lin, pero generalizando para grafos donde la distancia entre nodos puede ser distinta.

Antes de analizar un ejemplo concreto, vamos a mostrar el tipo Writable (InfoVertice) que se va a usar para almacenar la información asociada a un vértice. Dado que, como se comentó previamente, el mapper, además de este tipo de información, genera mensajes a los vértices vecinos, este tipo está diseñado para contener ambos datos. Nótese que este mismo tipo sería válido para distintos problemas de procesamiento de grafos.


import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.io.EOFException; 
 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.MapWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.Writable; 
import java.util.Map;

public class InfoVertice implements Writable { 
	 
	private IntWritable valor; 
	private MapWritable aristas; 
	 
	public IntWritable getValor() { 
		return valor; 
	} 
	public MapWritable getAristas() { 
		return aristas; 
	} 
	 
	public void setValor(IntWritable v) { 
		valor=v; 
	} 
	public InfoVertice(){ 
		valor = new IntWritable(); 
		aristas  = new MapWritable(); 
	} 
	public InfoVertice(InfoVertice v){ 
		this.valor = v.valor;
		this.aristas = v.aristas;
	} 

	public InfoVertice(IntWritable valor, MapWritable aristas){ 
		this.valor = valor; 
		this.aristas = aristas; 
	} 

	// constructor para obtener la información de la entrada de datos.
	// complementario del toString
	public InfoVertice(Text linea) {
		String [] palabras = linea.toString().split("\\s+");

		valor = new IntWritable(Integer.parseInt(palabras[0]));
		aristas  = new MapWritable(); 

                for (int i=1; i<palabras.length; i+=2)
			aristas.put(new Text(palabras[i]),
			    new IntWritable(Integer.parseInt(palabras[i+1])));
	} 

	// constructor para los mensajes que envían sólo el valor
	public InfoVertice(IntWritable valor) {
		this.valor = valor; 
		aristas  = null;
	} 
	 
	public void readFields(DataInput in) throws IOException { 
		valor = new IntWritable(); 
		valor.readFields(in); 
		aristas  = new MapWritable(); 
		try {
			aristas.readFields(in); 
		}
		catch (EOFException e) {
			aristas=null;
		}
	} 
	public void write(DataOutput out) throws IOException { 
		valor.write(out); 
		if (aristas!=null)
			aristas.write(out); 
	} 
	public String toString() { 
		StringBuilder cadena = new StringBuilder(valor.toString());
		if (aristas!=null)
			for(Map.Entry<Writable, Writable> arista : aristas.entrySet())
				cadena.append(" " + arista.getKey().toString()
				    + " " + arista.getValue().toString());
		return cadena.toString();
	} 
} 

A continuación, se resaltan algunos aspectos de este tipo: A continuación, se muestra un ejemplo de fichero de entrada donde hay una línea por cada vértice con campos separados por tabuladores, tal que en cada linea aparece, en primer lugar, el identificador del vértice, que actuará como clave, y, acto seguido, la información asociada a ese vértice (la que se incluirá en un objeto InfoVertice): el valor del vértice (se ha seguido el criterio de que un -1 representa el valor infinito, que será el valor inicial para todos los vértices en este algoritmo que obtiene distancias mínimas), y las parejas que representan cada arista con su identificador de nodo destino y su valor asociado.
A       -1      B       3       C       1
B       -1      D       2
C       -1      B       1
D       -1

A continuación, se muestra el mapper (GrafoMapper.java). La función map, al usarse KeyValueTextInputFormat, recibe como clave el identificador del vértice y como valor el resto de la línea, que será procesado por el constructor, poniéndose a cero el valor asociado al vértice en caso de tratarse del nodo origen. Por lo demás, se aplica el algoritmo enviando al reducer primero el propio vértice y, a continuación, los mensajes correspondientes a los vértices vecinos.
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.util.StringTokenizer;
import java.util.Map;
 
public class GrafoMapper extends Mapper<Text, Text, Text, InfoVertice>{
	private String origen;
        public void setup(Context contexto) throws IOException, InterruptedException {
                origen = contexto.getConfiguration().get("origen");
        }

	private int suma(IntWritable s1, IntWritable s2) {
		int s = s1.get();
		if (s == -1) return -1; // infinito
		return s + s2.get();
	}	
	public void map(Text vertice, Text valor, Context contexto) throws IOException, InterruptedException {
		InfoVertice infoVert = new InfoVertice(valor);

		// pone a 0 distancia de nodo origen a sí mismo
		if (vertice.toString().equals(origen))
			infoVert.setValor(new IntWritable(0));

		// envía el vértice
                contexto.write(vertice, infoVert);

		// envía mensajes a vértices adyacentes
		MapWritable aristas = infoVert.getAristas();
		for(Map.Entry<Writable, Writable> arista : aristas.entrySet()) {
			int distancia = suma(infoVert.getValor(),
				(IntWritable)arista.getValue());
                	contexto.write((Text)arista.getKey(),
				new InfoVertice(new IntWritable(distancia)));
		}
	}
}

A continuación, se muestra el reducer (GrafoReducer.java), que debe distinguir a la hora de aplicar el algoritmo entre los dos distintos datos recibidos (el vértice propiamente dicho y los mensajes dirigidos a este vértice desde nodos que apuntan al mismo). Nótese el uso de un contador para reflejar si se ha modificado el valor asociado al vértice al aplicar el algoritmo.
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

enum info {
        CAMBIOS
}

public class GrafoReducer extends Reducer<Text, InfoVertice, Text, InfoVertice> {
	private int minimo(int a, int b) {
		if (a==-1) return b;
		if (b==-1) return a;
		return ((a<b)? a : b);
	}
	public void reduce(Text vertice, Iterable<InfoVertice> valores, Context contexto) throws IOException, InterruptedException {
		InfoVertice infoVert = null;
		int distMinima = -1; // infinito
                for (InfoVertice mensaje : valores) {
			if (mensaje.getAristas()!=null) // es el vértice
				infoVert = new InfoVertice(mensaje);
			else  
				distMinima=minimo(distMinima, mensaje.getValor().get());
		}
		int distActual = infoVert.getValor().get();
		int distNueva = minimo(distActual, distMinima);
		if (distActual != distNueva) {
			contexto.getCounter(info.CAMBIOS).increment(1);
			infoVert.setValor(new IntWritable(distNueva));
		}
	        contexto.write(vertice, infoVert);
        }
}

Por último, se muestra la clase principal Grafo.java que tiene un esquema iterativo tal que en cada paso se activa un trabajo que usa como entrada el grafo resultado del paso previo, repitiéndose hasta que el contador indica que no ha habido cambios en esa iteración. Por motivos didácticos, el programa no borra los directorios intermedios utilizados.
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;


import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;


public class Grafo extends Configured implements Tool {

        public int run(String[] args) throws Exception {
		if (args.length < 2) {
      			System.err.println("Uso: grafo in out");
			System.exit(2);
		}
		
		boolean res;
		long cambios = 0;
		int nIter=0;
		String inputDir = args[0];
		String output = args[1];
		do {
			Job job = Job.getInstance(getConf());
	
			job.setJarByClass(getClass());

			job.setMapperClass(GrafoMapper.class);

			job.setReducerClass(GrafoReducer.class);
	
			job.setInputFormatClass(KeyValueTextInputFormat.class);

			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(InfoVertice.class);

			String outputDir = output + nIter++;
			FileInputFormat.addInputPath(job, new Path(inputDir));
    			FileOutputFormat.setOutputPath(job, new Path(outputDir));

			res =job.waitForCompletion(true);
			if (res) {
				cambios = job.getCounters().findCounter(info.CAMBIOS).getValue();
				inputDir=outputDir;
			}
		} while (res && (cambios>0));
                return (res? 0 : 1);

	}
        public static void main(String[] args) throws Exception {
                int resultado = ToolRunner.run(new Grafo(), args);
                System.exit(resultado);
        }
}