Bajo ningún concepto se pretende que esta guía constituya un curso exhaustivo sobre cómo programar en este entorno, estando disponible en Internet numerosa documentación sobre este tema, y menos sobre el despliegue de programas MapReduce sobre un cluster. A continuación, se incluyen tres referencias típicas para profundizar en todos estos aspectos.
Esta referencia describe cómo instalar este entorno en configuraciones de tipo local o localhost.
Sin embargo, para agilizar el trabajo al lector, se va a partir de un entorno ya instalado en triqui, de manera que éste sólo tenga que definir algunas variables de entorno en su cuenta para poder utilizarlo. Concretamente, debería incluir las siguientes definiciones en el fichero .bashrc (o equivalente) de su cuenta:
export HADOOP_PREFIX=~ssoo/ppd/hadoop-2.5.1 PATH=$PATH:$HADOOP_PREFIX/bin
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Null { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Uso: null in out"); System.exit(2); } // Crea un trabajo MapReduce Job job = Job.getInstance(); // Especifica el JAR del mismo job.setJarByClass(Null.class); // Especifica directorio de entrada y de salida FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // Arranca el trabajo y espera que termine System.exit(job.waitForCompletion(true) ? 0 : 1); } }
javac -cp `hadoop classpath` *.java # compila jar cvf Null.jar *.class # crea el JAR hadoop jar Null.jar Null input output # nombre del JAR, de la clase principal y args del programa
Los valores por defecto para un trabajo MapReduce corresponden a haber incluido en el programa las siguientes sentencias:
job.setInputFormatClass(TextInputFormat.class);Este formato de entrada es de tipo texto y considera cada línea del fichero como un registro invocando, por tanto, la función map del programa por cada línea pasándole como clave el offset dentro del fichero que corresponde al principio de la línea (de tipo LongWritable; Writable es el tipo serializable que usa MapReduce para gestionar todos los datos; en este caso, se corresponde con un long) y como valor el contenido de la línea (de tipo Text, la versión Writable del String).
job.setMapperClass(Mapper.class);Es un Mapper identidad, que simplemente copia la clave y el valor recibido.
job.setMapOutputKeyClass(LongWritable.class);El tipo de datos de la clave generada por map. Dado que la función map usada copia la clave recibida, es de tipo LongWritable.
job.setMapOutputValueClass(Text.class);El tipo de datos del valor generado por map. Dado que la función map usada copia el valor recibido, es de tipo Text.
job.setPartitionerClass(HashPartitioner.class);Es la clase que realiza la asignación de claves a reducers, usando una función hash para ello.
job.setNumReduceTasks(1);Sólo usa un Reducer; por eso, hay un único fichero en el directorio de salida.
job.setReducerClass(Reducer.class);Es un Reducer identidad, que simplemente copia la clave y el valor recibido.
job.setOutputKeyClass(LongWritable.class);El tipo de datos de la clave generada por reduce y por map excepto si se ha especificado uno distinto para map usando setMapOutputKeyClass. Dado que la función reduce usada copia la clave recibida, es de tipo LongWritable.
job.setOutputValueClass(Text.class);El tipo de datos del valor generado por reduce y por map excepto si se ha especificado uno distinto para map usando setMapValueKeyClass. Dado que la función reduce usada copia el valor recibido, es de tipo Text.
job.setOutputFormatClass(TextOutputFormat.class);Este formato de salida es de tipo texto y consiste en la clave y el valor separados, por defecto, por un tabulador (para pasar a texto los valores generados por reduce, el entorno de ejecución invoca el método toString de los respectivas clases Writables).
Se sugiere al lector que modifique el código de Null.java para especificar dos reducers y lo ejecute analizando la salida producida por el programa.
Para terminar esta primera toma de contacto, hay que explicar que el mandato hadoop gestiona sus propios argumentos de línea (veremos un ejemplo en la siguiente sección) y, por tanto, es necesario separar dentro de los argumentos de línea aquellos que corresponden a Hadoop y los que van destinados a la aplicación. La clase Tool facilita este trabajo. A continuación, se presenta la nueva versión de la clase Null.java usando este mecanismo.
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configured; public class Null extends Configured implements Tool { public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: null in out"); System.exit(2); } Job job = Job.getInstance(getConf()); // le pasa la config. job.setJarByClass(getClass()); // pequeño cambio FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int resultado = ToolRunner.run(new Null(), args); System.exit(resultado); } }
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configured; public class WC extends Configured implements Tool { public int run(String[] args) throws Exception { if (args.length < 2) { System.err.println("Uso: wc in [in...] out"); System.exit(2); } Job job = Job.getInstance(getConf()); job.setJarByClass(getClass()); job.setMapperClass(WCMapper.class); job.setCombinerClass(WCReducer.class); // Uso de un combiner job.setReducerClass(WCReducer.class); // Mismo tipo de salida para Mapper y Reducer job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < args.length-1; ++i) FileInputFormat.addInputPath(job, new Path(args[i])); FileOutputFormat.setOutputPath(job, new Path(args[args.length-1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int resultado = ToolRunner.run(new WC(), args); System.exit(resultado); } }
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.StringTokenizer; public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(valor.toString()); while (st.hasMoreTokens()) contexto.write(new Text(st.nextToken()), new IntWritable(1)); } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable> { public void reduce(Text clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException { int sum = 0; for (IntWritable valor : valores) sum += valor.get(); contexto.write(clave, new IntWritable(sum)); } }
Se plantea añadir las siguientes dos características a la funcionalidad de la aplicación:
A continuación, se muestra el código del mapper, que es la única clase que cambia al incluir esta funcionalidad (ejemplo4).
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.StringTokenizer; // tipo de datos para implementar los contadores enum incidencias { LINEAS_VACIAS, PALABRAS_CORTAS } public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private int longMin; // función invocada al principio de la ejecución de este mapper public void setup(Context contexto) throws IOException, InterruptedException { // obtiene el parámetro de configuración longMin = contexto.getConfiguration().getInt("longitud", 1); } public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException { if (valor.getLength() == 0) // incrementa el contador de líneas vacías contexto.getCounter(incidencias.LINEAS_VACIAS).increment(1); else { StringTokenizer st = new StringTokenizer(valor.toString()); while (st.hasMoreTokens()) { String palabra = st.nextToken(); if (palabra.length() >= longMin) contexto.write(new Text(palabra), new IntWritable(1)); else // incrementa el contador de palabras cortas contexto.getCounter(incidencias.PALABRAS_CORTAS).increment(1); } } } // función invocada al final de la ejecución de este mapper public void cleanup(Context contexto) throws IOException, InterruptedException { return; // no hace nada; sólo por motivos pedagógicos } }
Para pasar el argumento de configuración, el programa se invocará de la siguiente forma:
hadoop jar WC.jar WC -D longitud=4 input output
A continuación, se muestra el código del mapper y del reducer para este ejemplo (ejemplo5).
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.StringTokenizer; public class SigMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private String termino; public void setup(Context contexto) throws IOException, InterruptedException { termino = contexto.getConfiguration().get("palabra", ""); } public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException { boolean encontrada=false; StringTokenizer st = new StringTokenizer(valor.toString()); while (st.hasMoreTokens()) { String palabra = st.nextToken(); if (encontrada) { contexto.write(new Text(palabra), new IntWritable(1)); encontrada=false; } if (palabra.equals(termino)) encontrada=true; } } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SigReducer extends Reducer<Text,IntWritable,Text,IntWritable> { public void reduce(Text clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException { int sum = 0; for (IntWritable valor : valores) sum += valor.get(); contexto.write(clave, new IntWritable(sum)); } }
A continuación, se muestra el código del mapper y del reducer para este ejemplo (ejemplo6). Nótese el uso en ambos módulos de IntWritable para la clave, al ser ésta numérica.
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.StringTokenizer; enum incidencias { PALABRAS_VETADAS } public class WLMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{ private String vetada; public void setup(Context contexto) throws IOException, InterruptedException { vetada = contexto.getConfiguration().get("vetada"); } public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(valor.toString()); while (st.hasMoreTokens()) { String palabra = st.nextToken(); if ((vetada != null) && palabra.startsWith(vetada)) contexto.getCounter(incidencias.PALABRAS_VETADAS).increment(1); else contexto.write(new IntWritable(palabra.length()), new IntWritable(1)); } } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WLReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { public void reduce(IntWritable clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException { int sum = 0; for (IntWritable valor : valores) sum += valor.get(); contexto.write(clave, new IntWritable(sum)); } }
Para ilustrar este neuvo formato, supongamos que durante varios meses hemos estado ejecutando la aplicación WC sobre diversos contenidos de entrada y disponemos de los resultados de cada ejecución en un directorio independiente. Nos planteamos realizar una nueva aplicación que reciba como fuente de datos de entrada los distintos directorios de resultados y que lleve a cabo el recuento total de palabras.
A continuación, se muestra el código del mapper y del reducer (la clase principal es similar a la de ejemplos previos, únicamente especificando el nuevo formato de entrada usando job.setInputFormatClass(KeyValueTextInputFormat.class)) para este ejemplo (ejemplo7). Nótese el uso del formato de entrada KeyValueTextInputFormat y cómo esto afecta a los parámetros de mappers y reducers.
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.StringTokenizer; public class WCTotMapper extends Mapper<Text, Text, Text, IntWritable>{ public void map(Text clave, Text valor, Context contexto) throws IOException, InterruptedException { contexto.write(clave, new IntWritable(Integer.parseInt(valor.toString()))); } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCTotReducer extends Reducer<Text,IntWritable,Text,IntWritable> { public void reduce(Text clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException { int sum = 0; for (IntWritable valor : valores) sum += valor.get(); contexto.write(clave, new IntWritable(sum)); } }
Se plantea crear una versión modificada del programa anterior que calcule la media asociada a cada palabra, es decir, el promedio entre el número total de veces que aparece y el número de recuentos en los que está presente, redondeada a entero.
A continuación, se muestra sólo el código del reducer para este ejemplo (ejemplo8) (NOTA: este ejemplo se corresponde básicamente con el presentado en la figura 3.4 del libro de Jimmy Lin).
import java.io.IOException; emport org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCMedReducer extends Reducer<Text,IntWritable,Text,IntWritable> { public void reduce(Text clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException { int sum = 0; int n = 0; for (IntWritable valor : valores) { sum += valor.get(); ++n; } int media = (sum + n - 1)/n; // truco para que redondee contexto.write(clave, new IntWritable(media)); } }
Para poder comprobar el problema, vamos a suponer que en la entrada de datos de este trabajo puede aparecer en una partición asignada a un mapper el resultado de varios recuentos, para, de esta manera, asegurarnos de que aparece el problema al hacer que un mismo mapper se pueda encontrar con varias apariciones del mismo término (lo que forzaría que el combiner hiciera la media de más de un elemento).
Se sugiere, por tanto, que el lector concatene en un mismo fichero de entrada los resultados de varios recuentos (realmente, juntar los datos de entrada en un mismo fichero, es una técnica recomendable en este entorno puesto que el uso de ficheros de tamaño pequeño no es eficiente, al lanzarse un mapper por cada uno) y active el uso de combiners.
¿Tenemos que renunciar a la optimización que proporciona el uso de combiners para este ejemplo? La respuesta es afortunadamente negativa, tal como se analiza en el siguiente apartado.
En primer lugar, será necesario definir un nuevo tipo Writable para almacenar la pareja de enteros (IntPair.java). Aunque en esta guía no vamos a explicar en profundidad cómo definir un nuevo Writable (en Internet hay mucha información sobre este tema y, lo que es mejor, hay un montón de Writables ya programados para casi todo lo que se nos pueda ocurrir), sí se considera conveniente resaltar algunos aspectos sobre esta labor basándonos en la clase que se presenta a continuación. Además de los típicos constructores y funciones de tipo get, nos encontramos con las funciones readFields y write que llevan a cabo, respectivamente, la deserialización y serialización de los datos, basándose meramente en invocar esas mismas funciones sobre los campos que componen la clase. Asimismo, si queremos que ese tipo de datos pueda ser almacenado en un fichero de texto, es necesario definir el método toString.
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; public class IntPair implements Writable { private IntWritable first; private IntWritable second; public IntWritable getFirst() { return first; } public IntWritable getSecond() { return second; } public IntPair(){ first = new IntWritable(); second = new IntWritable(); } public IntPair(int first, int second){ this(new IntWritable(first), new IntWritable(second)); } public IntPair(IntWritable first, IntWritable second){ this.first = first; this.second = second; } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public String toString() { return first.toString() + "\t" + second.toString(); } }
job.setMapperClass(WCMedMapper.class); job.setCombinerClass(WCMedCombiner.class); job.setReducerClass(WCMedReducer.class); job.setMapOutputValueClass(IntPair.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class);
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.StringTokenizer; public class WCMedMapper extends Mapper<Text, Text, Text, IntPair>{ public void map(Text clave, Text valor, Context contexto) throws IOException, InterruptedException { int val = Integer.parseInt(valor.toString()); contexto.write(clave, new IntPair(val, 1)); } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCMedCombiner extends Reducer<Text,IntPair,Text,IntPair> { public void reduce(Text clave, Iterable<IntPair> valores, Context contexto) throws IOException, InterruptedException { int sum = 0; int n = 0; for (IntPair valor : valores) { sum += valor.getFirst().get(); n += valor.getSecond().get(); } contexto.write(clave, new IntPair(sum,n)); } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCMedReducer extends Reducer<Text,IntPair,Text,DoubleWritable> { public void reduce(Text clave, Iterable<IntPair> valores, Context contexto) throws IOException, InterruptedException { int sum = 0; int n = 0; for (IntPair valor : valores) { sum += valor.getFirst().get(); n += valor.getSecond().get(); } double media = (double)sum/n; contexto.write(clave, new DoubleWritable(media)); } }
Sin embargo, si lo considera oportuno para facilitar el procesamiento, el programador puede incluir estado tanto en un mapper como en un reducer, pero siempre valorando si el tamaño que ocupa esa información de estado en memoria puede causar problemas de escalabilidad en la aplicación.
A continuación, se muestra una versión del mapper (ejemplo10) del programa que cuenta palabras que usa estado para combinar los resultados (en el libro de Jimmy Lin a esta técnica se la denomina in-mapper combining y corresponde al programa de la figura 3.3).
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.StringTokenizer; import java.util.HashMap; import java.util.Map; public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private HashMap<String, Integer> tabla; public void setup(Context contexto) throws IOException, InterruptedException { tabla = new HashMap<String, Integer>(); } public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(valor.toString()); while (st.hasMoreTokens()) { String palabra = st.nextToken(); Integer c = tabla.get(palabra); if (c==null) c = new Integer(1); else c = new Integer(++c); tabla.put(new String(palabra), c); } } public void cleanup(Context contexto) throws IOException, InterruptedException { for(Map.Entry<String, Integer> palabra : tabla.entrySet()) contexto.write(new Text(palabra.getKey()), new IntWritable(palabra.getValue())); return; } }
job.setNumReduceTasks(0);
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configured; public class Mayus extends Configured implements Tool { public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: mayus in out"); System.exit(2); } Job job = Job.getInstance(getConf()); job.setJarByClass(getClass()); job.setMapperClass(MayusMapper.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int resultado = ToolRunner.run(new Mayus(), args); System.exit(resultado); } }
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MayusMapper extends Mapper<LongWritable, Text, NullWritable, Text>{ public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException { String linea = valor.toString().toUpperCase(); contexto.write(NullWritable.get(), new Text(linea)); } }
A continuación, se plantea un problema de las mismas características (ejemplo12) que realiza el filtrado de los datos de entrada dejando en la salida sólo aquellas líneas que incluyan un determinado valor que se recibe como argumento (similar al grep de UNIX). A continuación, se muestra el código del mapper de este ejemplo.
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class GrepMapper extends Mapper<LongWritable, Text, NullWritable, Text>{ private String cadena; public void setup(Context contexto) throws IOException, InterruptedException { cadena = contexto.getConfiguration().get("cadena"); } public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException { if (valor.toString().contains(cadena)) contexto.write(NullWritable.get(), valor); } }
A continuación, se plantea como ejemplo una aplicación que requiere la ejecución encadenada de dos trabajos:
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters.Counter; public class Jobs extends Configured implements Tool { public int run(String[] args) throws Exception { if (args.length < 2) { System.err.println("Uso: jobs in [in...] out"); System.exit(2); } Job job = Job.getInstance(getConf()); job.setJarByClass(getClass()); job.setMapperClass(SigMapper.class); job.setCombinerClass(SigReducer.class); job.setReducerClass(SigReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < args.length-1; ++i) FileInputFormat.addInputPath(job, new Path(args[i])); // se usa un directorio intermedio para que el primer // trabajo deje sus datos al segundo Path tmp = new Path(args[args.length-1] + "-tmp"); FileOutputFormat.setOutputPath(job, tmp); if (!job.waitForCompletion(true)) return 1; long total = job.getCounters().findCounter(contador.NUM_PALABRAS).getValue(); job = Job.getInstance(getConf()); job.setJarByClass(getClass()); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapperClass(NormMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job, tmp); FileOutputFormat.setOutputPath(job, new Path(args[args.length-1])); job.getConfiguration().setLong("total", total); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int resultado = ToolRunner.run(new Jobs(), args); System.exit(resultado); } }Observe que el primer trabajo usa tanto mapper como reducer (SigMapper.java y SigReducer.java, respectivamente), mientras que el segundo, que realiza la normalización, sólo requiere mapper (NormMapper.java). Nótese, asimismo, el uso de un directorio temporal para pasar los datos del primer trabajo al segundo. Por razones pedagógicas, no se ha borrado el directorio temporal.
La resolución de este problema en MapReduce presenta ciertas características interesantes que pueden aplicarse a otro tipo de problemas.
Para simplificar el problema, tal como hicimos en un ejercicio previo, por cada palabra del repositorio de entrada, sólo vamos a considerar como co-ocurrente aquélla que aparece justo a continuación de la misma.
Se trata, por tanto, de una generalización del ejercicio previo que realizaba el recuento de qué palabras aparecen a continuación de una dada: en este caso, lo haremos para todas las palabras. Además, calcularemos el total por cada palabra clave.
Tomando como partida la solución de ese ejercicio anterior, la principal diferencia es que en este caso el mapper sí que tiene que emitir la palabra que está analizando en ese momento (en el ejemplo previo, no era necesario puesto que era una palabra especificada por el usuario). Por tanto, debería generar tres datos: la palabra que está analizando, la siguiente y un contador a 1 para realizar el acumulado. ¿Cómo vamos a organizar esos tres datos?
Dependiendo de ello, vamos a plantear tres soluciones:
La clase principal (Sig.java) es igual que la del ejercicio previo pero especificando como valor de salida del mapper y del reducer un tipo TextIntPair en vez de un IntWritable.
A continuación, se muestra el mapper (SigMapper.java) que es similar al del ejercicio previo pero extendido a todas las palabras y usando un TextIntPair como valor.
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.StringTokenizer; public class SigMapper extends Mapper<LongWritable, Text, Text, TextIntPair>{ public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(valor.toString()); if (st.hasMoreTokens()) { String previa = st.nextToken(); while (st.hasMoreTokens()) { String palabra = st.nextToken(); contexto.write(new Text(previa), new TextIntPair(palabra, 1)); previa = palabra; } } } }
En cuanto al cálculo del total para cada palabra, requerido por el reducer pero que no debe estar presente en el combiner, es directo: basta con acumular el recuento de cada palabra que aparece a continuación dentro de la misma llamada reduce.
Dada que la funcionalidad del combiner y del reducer sólo se diferencia en el recuento del total por cada palabra clave que debe realizar el reducer, en este caso se ha optado por crear una clase base común (SigRedCom.java) que implemente la función reduce adecuada para ambos módulos, distinguiéndose por el valor especificado en su constructor. A continuación, se muestra esa clase común. Observe el uso de una estructura HashMap en cada invocación de reduce.
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.util.HashMap; import java.util.Map; public class SigRedCom extends Reducer<Text,TextIntPair,Text,TextIntPair> { private boolean isReducer=true; public SigRedCom(boolean reducer) { isReducer=reducer; } public void reduce(Text clave, Iterable<TextIntPair> valores, Context contexto) throws IOException, InterruptedException { HashMap<String, Integer> tabla = new HashMap<String, Integer>(); int total=0; for (TextIntPair valor : valores) { String palabra = valor.getFirst().toString(); int cuenta = valor.getSecond().get(); Integer c = tabla.get(palabra); if (c==null) c = new Integer(cuenta); else c = new Integer(c + cuenta); tabla.put(new String(palabra), c); if (isReducer) total+=cuenta; } for(Map.Entry<String, Integer> palabra : tabla.entrySet()) contexto.write(clave, new TextIntPair(palabra.getKey(), palabra.getValue())); if (isReducer) contexto.write(clave, new TextIntPair("", total)); } }
import java.io.*; import org.apache.hadoop.io.*; public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } public boolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } public String toString() { return first + "\t" + second; } public int compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; } return second.compareTo(tp.second); } }
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.StringTokenizer; public class SigMapper extends Mapper<LongWritable, Text, TextPair, IntWritable>{ public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(valor.toString()); if (st.hasMoreTokens()) { String previa = st.nextToken(); while (st.hasMoreTokens()) { String palabra = st.nextToken(); contexto.write(new TextPair(previa, palabra), new IntWritable(1)); previa = palabra; } } } }
import org.apache.hadoop.io.IntWritable; public class SigAcumulador { public static int suma(Iterable<IntWritable> valores) { int sum = 0; for (IntWritable valor : valores) sum += valor.get(); return sum; } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SigCombiner extends Reducer<TextPair,IntWritable,TextPair,IntWritable> { public void reduce(TextPair clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException { contexto.write(clave, new IntWritable(SigAcumulador.suma(valores))); } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SigReducer extends Reducer<TextPair,IntWritable,TextPair,IntWritable> { private int total = 0; private String palabraPrevia = ""; public void reduce(TextPair clave, Iterable<IntWritable> valores, Context contexto) throws IOException, InterruptedException { String palabra = clave.getFirst().toString(); if ((!palabra.equals(palabraPrevia)) && (palabraPrevia.length()>0)){ contexto.write(new TextPair(palabraPrevia,""), new IntWritable(total)); total=0; } int sum = new IntWritable(SigAcumulador.suma(valores)).get(); contexto.write(clave, new IntWritable(sum)); total+=sum; palabraPrevia=palabra; } public void cleanup(Context contexto) throws IOException, InterruptedException { if (palabraPrevia.length()>0) contexto.write(new TextPair(palabraPrevia,""), new IntWritable(total)); } }
Para solucionar este problema, es necesario definir una nueva clase Partitioner (denominada SigPartitioner) que realice la asignación a un reducer teniendo en cuenta únicamente la primera parte de la clave, y especificarla en la configuración:
job.setPartitionerClass(SigPartitioner.class);A continuación, se muestra la clase principal de la aplicación (Sig.java).
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configured; public class Sig extends Configured implements Tool { public static class SigPartitioner extends Partitioner<TextPair, IntWritable> { public int getPartition(TextPair clave, IntWritable valor, int nReducers) { return (clave.getFirst().hashCode() & Integer.MAX_VALUE) % nReducers; } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.println("Uso: sig in [in...] out"); System.exit(2); } Job job = Job.getInstance(getConf()); job.setJarByClass(getClass()); job.setMapperClass(SigMapper.class); job.setCombinerClass(SigCombiner.class); job.setReducerClass(SigReducer.class); job.setOutputKeyClass(TextPair.class); job.setOutputValueClass(IntWritable.class); job.setPartitionerClass(SigPartitioner.class); for (int i = 0; i < args.length-1; ++i) FileInputFormat.addInputPath(job, new Path(args[i])); FileOutputFormat.setOutputPath(job, new Path(args[args.length-1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int resultado = ToolRunner.run(new Sig(), args); System.exit(resultado); } }
Esta solución (ejemplo16) no requeriría, en principio, definir un nuevo Writable. Sin embargo, dado que el tipo MapWritable no proporciona un método toString específico y este método es necesario si queremos que los datos resultantes queden almacenados en ficheros de texto, hemos creado una clase derivada (a la que hemos llamado MapPrintable) simplemente para añadirle ese método. Téngase en cuenta que, por razones didácticas, no se ha pretendido en el diseño de este método que la salida final resultante en modo texto tenga el mismo formato que en las dos soluciones previas (en esta solución, todas las estadísticas relacionadas con una palabra clave aparecen en la misma línea, mientras que en las soluciones anteriores estaban en líneas diferentes; se podría modificar el método para conseguir el mismo formato pero habría que complicarlo un poco: ejemplo16 con mismo formato de salida).
import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; import java.util.Map; public class MapPrintable extends MapWritable { public String toString() { StringBuilder cadena = new StringBuilder(); for(Map.Entry<Writable, Writable> entrada : this.entrySet()) cadena.append(entrada.getKey().toString() + " " + entrada.getValue().toString() + " "); return cadena.toString(); } }
Para ir almacenando la relación entre palabras clave y su stripe correspondiente, se va a usar un HashMap de Java que guarde esta asociación (HashMap<String, MapPrintable>). A continuación, se muestra el código de este >mapper.
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.StringTokenizer; import java.util.HashMap; import java.util.Map; public class SigMapper extends Mapper<LongWritable, Text, Text, MapPrintable>{ private HashMap<String, MapPrintable> tabla; public void setup(Context contexto) throws IOException, InterruptedException { tabla = new HashMap<String, MapPrintable>(); } public void map(LongWritable clave, Text valor, Context contexto) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(valor.toString()); if (st.hasMoreTokens()) { String previa = st.nextToken(); while (st.hasMoreTokens()) { String palabra = st.nextToken(); MapPrintable stripe = tabla.get(previa); // primera aparición de esa palabra como clave if (stripe==null) { stripe = new MapPrintable(); tabla.put(new String(previa), stripe); stripe.put(new Text(palabra), new IntWritable(1)); } else { IntWritable c = (IntWritable) stripe.get(new Text(palabra)); // 1ª aparición de esa palabra como siguiente if (c==null) c = new IntWritable(1); else c = new IntWritable(c.get()+1); stripe.put(new Text(palabra), c); } previa = palabra; } } } public void cleanup(Context contexto) throws IOException, InterruptedException { for(Map.Entry<String, MapPrintable> stripe : tabla.entrySet()) contexto.write(new Text(stripe.getKey()), stripe.getValue()); return; } }
Como se planteó en la primera solución, dada que la funcionalidad del combiner y del reducer sólo se diferencia en el recuento del total por cada palabra clave que debe realizar el reducer, se ha optado también por crear una clase base común (SigRedCom.java) que implemente la función reduce adecuada para ambos módulos, distinguiéndose por el valor especificado en su constructor. A continuación, se muestra esa clase común.
import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.util.HashMap; import java.util.Map; public class SigRedCom extends Reducer<Text,MapPrintable,Text,MapPrintable> { private boolean isReducer=true; public SigRedCom(boolean reducer) { isReducer=reducer; } public void reduce(Text clave, Iterable<MapPrintable> valores, Context contexto) throws IOException, InterruptedException { MapPrintable stripeMix = new MapPrintable(); int total=0; for (MapPrintable stripe : valores) { for(Map.Entry<Writable, Writable> entrada : stripe.entrySet()) { Text palabra = (Text) entrada.getKey(); int cuenta = ((IntWritable) entrada.getValue()).get(); IntWritable c = (IntWritable)stripeMix.get(palabra); if (c == null) stripeMix.put(new Text(palabra), new IntWritable(cuenta)); else stripeMix.put(new Text(palabra), new IntWritable(c.get() + cuenta)); if (isReducer) total+=cuenta; } } if (isReducer) stripeMix.put(new Text(""), new IntWritable(total)); contexto.write(clave, stripeMix); } }
Antes de analizar un ejemplo concreto, vamos a mostrar el tipo Writable (InfoVertice) que se va a usar para almacenar la información asociada a un vértice. Dado que, como se comentó previamente, el mapper, además de este tipo de información, genera mensajes a los vértices vecinos, este tipo está diseñado para contener ambos datos. Nótese que este mismo tipo sería válido para distintos problemas de procesamiento de grafos.
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.EOFException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import java.util.Map; public class InfoVertice implements Writable { private IntWritable valor; private MapWritable aristas; public IntWritable getValor() { return valor; } public MapWritable getAristas() { return aristas; } public void setValor(IntWritable v) { valor=v; } public InfoVertice(){ valor = new IntWritable(); aristas = new MapWritable(); } public InfoVertice(InfoVertice v){ this.valor = v.valor; this.aristas = v.aristas; } public InfoVertice(IntWritable valor, MapWritable aristas){ this.valor = valor; this.aristas = aristas; } // constructor para obtener la información de la entrada de datos. // complementario del toString public InfoVertice(Text linea) { String [] palabras = linea.toString().split("\\s+"); valor = new IntWritable(Integer.parseInt(palabras[0])); aristas = new MapWritable(); for (int i=1; i<palabras.length; i+=2) aristas.put(new Text(palabras[i]), new IntWritable(Integer.parseInt(palabras[i+1]))); } // constructor para los mensajes que envían sólo el valor public InfoVertice(IntWritable valor) { this.valor = valor; aristas = null; } public void readFields(DataInput in) throws IOException { valor = new IntWritable(); valor.readFields(in); aristas = new MapWritable(); try { aristas.readFields(in); } catch (EOFException e) { aristas=null; } } public void write(DataOutput out) throws IOException { valor.write(out); if (aristas!=null) aristas.write(out); } public String toString() { StringBuilder cadena = new StringBuilder(valor.toString()); if (aristas!=null) for(Map.Entry<Writable, Writable> arista : aristas.entrySet()) cadena.append(" " + arista.getKey().toString() + " " + arista.getValue().toString()); return cadena.toString(); } }
A -1 B 3 C 1 B -1 D 2 C -1 B 1 D -1
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.util.StringTokenizer; import java.util.Map; public class GrafoMapper extends Mapper<Text, Text, Text, InfoVertice>{ private String origen; public void setup(Context contexto) throws IOException, InterruptedException { origen = contexto.getConfiguration().get("origen"); } private int suma(IntWritable s1, IntWritable s2) { int s = s1.get(); if (s == -1) return -1; // infinito return s + s2.get(); } public void map(Text vertice, Text valor, Context contexto) throws IOException, InterruptedException { InfoVertice infoVert = new InfoVertice(valor); // pone a 0 distancia de nodo origen a sí mismo if (vertice.toString().equals(origen)) infoVert.setValor(new IntWritable(0)); // envía el vértice contexto.write(vertice, infoVert); // envía mensajes a vértices adyacentes MapWritable aristas = infoVert.getAristas(); for(Map.Entry<Writable, Writable> arista : aristas.entrySet()) { int distancia = suma(infoVert.getValor(), (IntWritable)arista.getValue()); contexto.write((Text)arista.getKey(), new InfoVertice(new IntWritable(distancia))); } } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; enum info { CAMBIOS } public class GrafoReducer extends Reducer<Text, InfoVertice, Text, InfoVertice> { private int minimo(int a, int b) { if (a==-1) return b; if (b==-1) return a; return ((a<b)? a : b); } public void reduce(Text vertice, Iterable<InfoVertice> valores, Context contexto) throws IOException, InterruptedException { InfoVertice infoVert = null; int distMinima = -1; // infinito for (InfoVertice mensaje : valores) { if (mensaje.getAristas()!=null) // es el vértice infoVert = new InfoVertice(mensaje); else distMinima=minimo(distMinima, mensaje.getValor().get()); } int distActual = infoVert.getValor().get(); int distNueva = minimo(distActual, distMinima); if (distActual != distNueva) { contexto.getCounter(info.CAMBIOS).increment(1); infoVert.setValor(new IntWritable(distNueva)); } contexto.write(vertice, infoVert); } }
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters.Counter; public class Grafo extends Configured implements Tool { public int run(String[] args) throws Exception { if (args.length < 2) { System.err.println("Uso: grafo in out"); System.exit(2); } boolean res; long cambios = 0; int nIter=0; String inputDir = args[0]; String output = args[1]; do { Job job = Job.getInstance(getConf()); job.setJarByClass(getClass()); job.setMapperClass(GrafoMapper.class); job.setReducerClass(GrafoReducer.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(InfoVertice.class); String outputDir = output + nIter++; FileInputFormat.addInputPath(job, new Path(inputDir)); FileOutputFormat.setOutputPath(job, new Path(outputDir)); res =job.waitForCompletion(true); if (res) { cambios = job.getCounters().findCounter(info.CAMBIOS).getValue(); inputDir=outputDir; } } while (res && (cambios>0)); return (res? 0 : 1); } public static void main(String[] args) throws Exception { int resultado = ToolRunner.run(new Grafo(), args); System.exit(resultado); } }