반응형
하둡의 로그는 log4j를 사용한다.
access_log는 서버에 요청된 모든 url을 기록한다.
전송 성공 : 200
전송 실패 : 200이 아닌 모든 값
400 경우 URL을 찾을 수 없다는 오류
setup, cleanup 함수를 활용한다.
package hadoop.MapReduce.success;
import hadoop.MapReduce.tool.WordCount2;
import hadoop.MapReduce.tool.WordCount2Mapper;
import hadoop.MapReduce.tool.WordCount2Reducer;
import lombok.extern.log4j.Log4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 맵리듀스를 실행하기 위한 Main 함수가 존재하는 자바 파일
* 드라이버 파일로 부름
*/
@Log4j
public class ResultCount extends Configuration implements Tool {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
log.info("분석할 폴더(파일) 및 분석 결과가 저장될 폴더를 입력해야 한다.");
System.exit(-1);
}
int exitCode = ToolRunner.run(new ResultCount(), args);
System.exit(exitCode);
}
@Override
public void setConf(Configuration conf) {
conf.set("AppName", "Send Result");
conf.set("resultCode", "200");
}
@Override
public Configuration getConf() {
Configuration conf = new Configuration();
this.setConf(conf);
return conf;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
String appName = conf.get("AppName");
log.info("appName : " + appName);
// 맵리듀스 시랳ㅇ을 위한 잡 객체를 가져온다.
// 하둡이 실행되면, 기본적으로 잡 객체를 메모리에 올린다.
Job job = Job.getInstance(conf);
// 맵리듀스 잡이 시작되는 main 함수가 존재하는 파일 설정
job.setJarByClass(ResultCount.class);
// 맵리듀스 잡 이름 설정, 리소스 매니저 등 맵리듀스 실행 결과 및 로그 확인할 때 편리
job.setJobName(appName);
// 분석할 폴더(파일) -- 첫 번째 파라미터
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 분석 결과가 저장되는 폴더(파일) -- 두 번째 파라미터
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 맵리듀스의 맵 역할을 수행하는 Mapper 자바 파일 설정
job.setMapperClass(ResultCountMapper.class);
// 맵리듀스의 리듀스 역할을 수행하는 Reducer 자바 파일 설정
job.setReducerClass(ResultCountReducer.class);
// 분석 결과가 저장될 때 사용될 키의 데이터 타입
job.setOutputKeyClass(Text.class);
// 분석 결과가 저장될 때 사용될 값의 데이터 타입
job.setOutputValueClass(IntWritable.class);
// 맵리듀스 실행
boolean success = job.waitForCompletion(true);
return (success ? 0 : 1);
}
}
ResultCount.java
package hadoop.MapReduce.success;
import lombok.extern.log4j.Log4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
@Log4j
public class ResultCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
String appName = "";
String resultCode = "";
@Override
protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
this.appName = conf.get("AppName");
this.resultCode = conf.get("resultCode", "200");
log.info("[" + this.appName + "] 난 map 함수 실행하기 전에 1번만 실행되는 setup 함수다.");
}
@Override
protected void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
super.cleanup(context);
log.info("[" + this.appName + "] 난 에러나도 무조건 실행되는 cleanup 함수다.");
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] arr = line.split("\\W+");
int pos = arr.length - 2;
String result = arr[pos];
log.info("[" + this.appName + "] " + result);
if (resultCode.equals(result)) {
context.write(new Text(result), new IntWritable(1));
}
}
}
ResultCountMapper.java
package hadoop.MapReduce.success;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ResultCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int resultCodeCount = 0;
for (IntWritable value : values) {
resultCodeCount += value.get();
}
context.write(key, new IntWritable(resultCodeCount));
}
}
ResultCountReducer.java
새로 빌드하고 다시 hadoop에 올려준다.
hadoop jar mr.jar hadoop.MapReduce.success.ResultCount /access_log /result10
hadoop fs -ls /result10
hadoop fs -cat /result10/*
실행 결과를 확인한다.
반응형
'Data Base > Hadoop' 카테고리의 다른 글
[Hadoop] : Map 프로그래밍 (0) | 2022.05.17 |
---|---|
[Hadoop] : Combiner, 컴바이너 (0) | 2022.05.10 |
[Hadoop] : 맵리듀스 제어 함수, setup, cleanup (0) | 2022.05.03 |
[Hadoop] : ToolRunner 사용하기 (0) | 2022.05.03 |
[Hadoop] : 하둡, 맵리듀스 프로젝트 IP 별 호출 빈도 수 분석하기 (0) | 2022.04.28 |
댓글