hadoop - Concatenate S3 files to read in EMR -
i have s3 bucket log files want concatenate, use input emr job. log files in paths like: bucket-name/[date]/product/out/[hour]/[minute-based-file]
. i'd take minute logs in hour directories in date directories, , concatenate them 1 file. want use file input emr job. original log files need preserved, , new combined log file written different s3 bucket.
i tried using hadoop fs -getmerge
on emr master node via ssh, got error:
this file system object (file:///) not support access request path 's3://target-bucket-name/merged.log'
the source s3 bucket has other files in it, don't want include all of files. wildcard match looks this: s3n://bucket-name/*/product/out/*/log.*
.
the purpose around problem of having tens/hundreds of thousands of small (10k-3mb) input files emr, , instead give 1 large file can split more efficiently.
i ended writing script wraps hadoop filesystem commands this.
#!/usr/bin/env ruby require 'date' # merge minute-based log files daily log files # usage: run on emr master (e.g. ssh master `ruby ~/merge-historical-logs.rb [from [to]]`) source_bucket_name = 's3-logs-bucket' destination_bucket_name = 's3-merged-logs-bucket' # optional date inputs min_date = if argv[0] min_date_args = argv[0].split('-').map {|item| item.to_i} date.new(*min_date_args) else date.new [2012, 9, 1] end max_date = if argv[1] max_date_args = argv[1].split('-').map {|item| item.to_i} date.new(*max_date_args) else date.today end # setup directories hdfs_logs_dir = '/mnt/tmp/logs' local_tmp_dir = './_tmp_merges' puts "cleaning filesystem" system "hadoop fs -rmr #{hdfs_logs_dir}" system "rm -rf #{local_tmp_dir}*" puts "making hdfs directories" system "hadoop fs -mkdir #{hdfs_logs_dir}" # progress backwards, max min date = max_date while date >= min_date # format date pieces year = date.year month = "%02d" % date.month day = "%02d" % date.day # make directory in hdfs store day's hourly logs today_hours_dir = "#{hdfs_logs_dir}/#{year}-#{month}-#{day}" puts "making today's hourly directory" system "hadoop fs -mkdir #{today_hours_dir}" # break day's hours few chunks # seems avoid problems when run lots of getmerge commands in parallel [*(0..23)].each_slice(8).to_a.each |hour_chunk| hour_chunk.each |_hour| hour = "%02d" % _hour # setup args merge minute logs hour logs source_file = "s3://#{source_bucket_name}/#{year}-#{month}-#{day}/product/out/#{hour}/" output_file = "#{local_tmp_dir}/#{hour}.log" # launch each hour's getmerge in background full_command = "hadoop fs -getmerge #{source_file} #{output_file}" puts "forking: #{full_command}" fork { system full_command } end # wait batch of germerge's finish process.waitall end # delete local temp files hadoop created puts "removing temp files" system "rm #{local_tmp_dir}/.*.crc" # move local hourly logs hdfs free local space puts "moving local logs hdfs" system "hadoop fs -put #{local_tmp_dir}/* #{today_hours_dir}" puts "removing local logs" system "rm -rf #{local_tmp_dir}" # merge day's hourly logs single daily log file daily_log_file_name = "#{year}-#{month}-#{day}.log" daily_log_file_path = "#{local_tmp_dir}_day/#{daily_log_file_name}" puts "merging hourly logs daily log" system "hadoop fs -getmerge #{today_hours_dir}/ #{daily_log_file_path}" # write daily log file s3 bucket puts "writing daily log s3" system "hadoop fs -put #{daily_log_file_path} s3://#{destination_bucket_dir}/daily-merged-logs/#{daily_log_file_name}" # remove daily log locally puts "removing local daily logs" system "rm -rf #{local_tmp_dir}_day" # remove hourly logs hdfs puts "removing hdfs hourly logs" system "hadoop fs -rmr #{today_hours_dir}" # go in time date -= 1 end
Comments
Post a Comment