import java.io.IOException; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Cette classe permet de vérifier un job MapReduce : types des paramètres * génériques des classes Mapper, Reducer et Combiner * * @author Pierre Nerzic - pierre.nerzic@univ-rennes1.fr * * usage: * YarnJob job = new YarnJob(conf, "nom"); * job.setJarByClass(MyDriver.class); * job.setMapperClass(MyMapper.class); * job.setCombinerClass(MyCombiner.class); * job.setReducerClass(MyReducer.class); * job.setInputDirRecursive(false); * job.addInputPath(new Path(...)); * job.setInputFormatClass(TextInputFormat.class); * job.setOutputPath(new Path(...)); * job.setOutputFormatClass(TextOutputFormat.class); * boolean success = job.waitForCompletion(true); */ public class YarnJob { // job Yarn contrôlé par cette classe private Job job; // tableaux des types qui paramètrent le mapper, le reducer, etc. private Type[] inputGenericTypes; private Type[] mapperGenericTypes; private Type[] combinerGenericTypes; private Type[] reducerGenericTypes; /** * constructeur * @param conf configuration Yarn, utiliser this.getConf() dans une instance de Configured * @para jobName nom du job, visible dans les logs */ public YarnJob(Configuration conf, String jobName) throws IOException { job = Job.getInstance(conf, jobName); } /** * Set the Jar by finding where a given class came from. * voir {@link org.apache.hadoop.mapreduce.Job#setJarByClass(Class cls) setJarByClass} */ public void setJarByClass(Class classe) { job.setJarByClass(classe); } /** * voir {@link org.apache.hadoop.mapreduce.Job#setMapperClass(Class cls) setMapperClass} */ public void setMapperClass(Class> classe) { // définir la classe du mapper job.setMapperClass(classe); // obtenir les types génériques du mapper ParameterizedType superclasse = (ParameterizedType) classe.getGenericSuperclass(); mapperGenericTypes = superclasse.getActualTypeArguments(); } /** * voir {@link org.apache.hadoop.mapreduce.Job#setCombinerClass(Class cls) setCombinerClass} */ public void setCombinerClass(Class> classe) throws Exception { job.setCombinerClass(classe); // obtenir les types génériques du combiner ParameterizedType superclasse = (ParameterizedType) classe.getGenericSuperclass(); combinerGenericTypes = superclasse.getActualTypeArguments(); } /** * voir {@link org.apache.hadoop.mapreduce.Job#setReducerClass(Class cls) setReducerClass} */ public void setReducerClass(Class> classe) { job.setReducerClass(classe); // obtenir les types génériques du reducer ParameterizedType superclasse = (ParameterizedType) classe.getGenericSuperclass(); reducerGenericTypes = superclasse.getActualTypeArguments(); } /** * voir {@link org.apache.hadoop.mapreduce.Job#setInputFormatClass(Class cls) setInputFormatClass} */ public void setInputFormatClass(Class> classe) { job.setInputFormatClass(classe); // obtenir les types génériques du format ParameterizedType superclasse = (ParameterizedType) classe.getGenericSuperclass(); inputGenericTypes = superclasse.getActualTypeArguments(); } /** * voir {@link org.apache.hadoop.mapreduce.Job#setOutputFormatClass(Class cls) setOutputFormatClass} */ @SuppressWarnings("rawtypes") public void setOutputFormatClass(Class classe) { job.setOutputFormatClass(classe); // on ne peut pas vérifier les types paramètres génériques de la classe } /** * voir {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat#addInputPath(Job job, Path path) addInputPath} */ public void addInputPath(Path path) throws IOException { FileInputFormat.addInputPath(job, path); } /** * voir {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat#setInputDirRecursive(Job job, boolean inputDirRecursive) setInputDirRecursive} */ public void setInputDirRecursive(boolean inputDirRecursive) { FileInputFormat.setInputDirRecursive(job, inputDirRecursive); } /** * voir {@link org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#setOutputPath(Job job, Path outputDir) setOutputPath} */ public void setOutputPath(Path path) { FileOutputFormat.setOutputPath(job, path); } /** * voir {@link org.apache.hadoop.mapreduce.Job#waitForCompletion(boolean verbose) waitForCompletion} */ public boolean waitForCompletion(boolean verbose) throws Exception { // classes de base présentes ? if (inputGenericTypes == null) { System.err.println("Il manque un appel à setInputFormatClass"); throw new Exception("InputFormat non defini"); } if (mapperGenericTypes == null) { System.err.println("Il manque un appel à setMapperClass"); throw new Exception("Mapper non defini"); } if (reducerGenericTypes == null) { System.err.println("Il manque un appel à setReducerClass"); throw new Exception("Reducer non defini"); } // vérifier le mapper par rapport au format d'entrée if (inputGenericTypes[0] != mapperGenericTypes[0] || inputGenericTypes[1] != mapperGenericTypes[1]) { System.err.println("InputFormat => K="+inputGenericTypes[0]+" V="+inputGenericTypes[1]); System.err.println("Mapper"); throw new Exception("Format d'entrée et Mapper incompatibles"); } // vérifier le combiner if (combinerGenericTypes != null) { if (combinerGenericTypes[0] != combinerGenericTypes[2]) { System.err.println("La classe combiner doit voir les clés du même type en entrée et sortie"); throw new Exception("Clés du Combiner incorrectes"); } if (combinerGenericTypes[1] != combinerGenericTypes[3]) { System.err.println("La classe combiner doit voir les valeurs du même type en entrée et sortie"); throw new Exception("Valeurs du Combiner incorrectes"); } if (combinerGenericTypes[0] != mapperGenericTypes[2] || combinerGenericTypes[1] != mapperGenericTypes[3]) { System.err.println("Mapper => K="+mapperGenericTypes[2]+" V="+mapperGenericTypes[3]); System.err.println("Combiner"); throw new Exception("Mapper et Combiner incompatibles"); } if (combinerGenericTypes[2] != reducerGenericTypes[0] || combinerGenericTypes[3] != reducerGenericTypes[1]) { System.err.println("Combiner => K="+combinerGenericTypes[2]+" V="+combinerGenericTypes[3]); System.err.println("Reducer"); throw new Exception("Combiner et Reducer incompatibles"); } } // vérifier le reducer par rapport au mapper if (mapperGenericTypes[2] != reducerGenericTypes[0] || mapperGenericTypes[3] != reducerGenericTypes[1]) { System.err.println("Mapper => K="+mapperGenericTypes[2]+" V="+mapperGenericTypes[3]); System.err.println("Reducer"); throw new Exception("Mapper et Reducer incompatibles"); } // finaliser la configuration du job job.setMapOutputKeyClass((Class) mapperGenericTypes[2]); job.setMapOutputValueClass((Class) mapperGenericTypes[3]); job.setOutputKeyClass((Class) reducerGenericTypes[2]); job.setOutputValueClass((Class) reducerGenericTypes[3]); // lancer le job return job.waitForCompletion(verbose); } }