hadoop - Concatenate S3 files to read in EMR -


i have s3 bucket log files want concatenate, use input emr job. log files in paths like: bucket-name/[date]/product/out/[hour]/[minute-based-file]. i'd take minute logs in hour directories in date directories, , concatenate them 1 file. want use file input emr job. original log files need preserved, , new combined log file written different s3 bucket.

i tried using hadoop fs -getmerge on emr master node via ssh, got error:

this file system object (file:///) not support access request path 's3://target-bucket-name/merged.log'

the source s3 bucket has other files in it, don't want include all of files. wildcard match looks this: s3n://bucket-name/*/product/out/*/log.*.

the purpose around problem of having tens/hundreds of thousands of small (10k-3mb) input files emr, , instead give 1 large file can split more efficiently.

i ended writing script wraps hadoop filesystem commands this.

#!/usr/bin/env ruby  require 'date'  # merge minute-based log files daily log files # usage: run on emr master (e.g. ssh master `ruby ~/merge-historical-logs.rb [from [to]]`)  source_bucket_name      = 's3-logs-bucket' destination_bucket_name = 's3-merged-logs-bucket'  # optional date inputs min_date = if argv[0]   min_date_args = argv[0].split('-').map {|item| item.to_i}   date.new(*min_date_args) else   date.new [2012, 9, 1] end  max_date = if argv[1]   max_date_args = argv[1].split('-').map {|item| item.to_i}   date.new(*max_date_args) else   date.today end  # setup directories hdfs_logs_dir = '/mnt/tmp/logs' local_tmp_dir = './_tmp_merges'  puts "cleaning filesystem" system "hadoop fs -rmr #{hdfs_logs_dir}" system "rm -rf #{local_tmp_dir}*"  puts "making hdfs directories" system "hadoop fs -mkdir #{hdfs_logs_dir}"  # progress backwards, max min date = max_date while date >= min_date   # format date pieces   year  = date.year   month = "%02d" % date.month   day   = "%02d" % date.day    # make directory in hdfs store day's hourly logs   today_hours_dir = "#{hdfs_logs_dir}/#{year}-#{month}-#{day}"   puts "making today's hourly directory"   system "hadoop fs -mkdir #{today_hours_dir}"    # break day's hours few chunks   # seems avoid problems when run lots of getmerge commands in parallel   [*(0..23)].each_slice(8).to_a.each |hour_chunk|     hour_chunk.each |_hour|       hour = "%02d" % _hour        # setup args merge minute logs hour logs       source_file = "s3://#{source_bucket_name}/#{year}-#{month}-#{day}/product/out/#{hour}/"       output_file = "#{local_tmp_dir}/#{hour}.log"        # launch each hour's getmerge in background       full_command = "hadoop fs -getmerge #{source_file} #{output_file}"       puts "forking: #{full_command}"       fork { system full_command }     end      # wait batch of germerge's finish     process.waitall   end    # delete local temp files hadoop created   puts "removing temp files"   system "rm #{local_tmp_dir}/.*.crc"    # move local hourly logs hdfs free local space   puts "moving local logs hdfs"   system "hadoop fs -put #{local_tmp_dir}/* #{today_hours_dir}"    puts "removing local logs"   system "rm -rf #{local_tmp_dir}"    # merge day's hourly logs single daily log file   daily_log_file_name = "#{year}-#{month}-#{day}.log"   daily_log_file_path = "#{local_tmp_dir}_day/#{daily_log_file_name}"   puts "merging hourly logs daily log"   system "hadoop fs -getmerge #{today_hours_dir}/ #{daily_log_file_path}"    # write daily log file s3 bucket   puts "writing daily log s3"   system "hadoop fs -put #{daily_log_file_path} s3://#{destination_bucket_dir}/daily-merged-logs/#{daily_log_file_name}"    # remove daily log locally   puts "removing local daily logs"   system "rm -rf #{local_tmp_dir}_day"    # remove hourly logs hdfs   puts "removing hdfs hourly logs"   system "hadoop fs -rmr #{today_hours_dir}"    # go in time   date -= 1 end 

Comments

Popular posts from this blog

linux - Does gcc have any options to add version info in ELF binary file? -

android - send complex objects as post php java -

charts - What graph/dashboard product is facebook using in Dashboard: PUE & WUE -