#!/usr/bin/env python """ Hadoop Bootstrap Script (based on hadoop 0.20.203 release) """ import os, sys import pdb import urllib import subprocess import logging import uuid import shutil import time import signal from optparse import OptionParser logging.basicConfig(level=logging.DEBUG) # For automatic Download and Installation HADOOP_DOWNLOAD_URL = "http://apache.linux-mirror.org//hadoop/common/hadoop-1.0.0/hadoop-1.0.0.tar.gz" WORKING_DIRECTORY = os.path.join(os.getcwd(), "work") # For using an existing installation # HADOOP 0.22 - no webhdfs # HADOOP 1.0.0 - has support for webhdfs HADOOP_HOME=os.path.join(os.getcwd(), "work/hadoop-1.0.0") STOP=False def handler(signum, frame): logging.debug("Signal catched. Stop Hadoop") global STOP STOP=True class HadoopBootstrap(object): def __init__(self, working_directory): self.working_directory=working_directory self.jobid = "hadoop-conf-"+str(uuid.uuid1()) self.job_working_directory = os.path.join(WORKING_DIRECTORY, self.jobid) self.job_conf_dir = os.path.join(self.job_working_directory, "conf") self.job_name_dir = os.path.join(self.job_working_directory, "name") self.job_log_dir = os.path.join(self.job_working_directory, "logs") try: os.makedirs(job_dir) os.makedirs(log_dir) except: pass def get_core_site_xml(self, hostname): return """ fs.default.name hdfs://%s:9000 """%(hostname) def get_hdfs_site_xml(self, hostname, name_dir): return """ dfs.replication 1 dfs.name.dir %s dfs.datanode.data.dir.perm 700 Permissions for the directories on on the local filesystem where the DFS data node store its blocks. The permissions can either be octal or symbolic. dfs.webhdfs.enabled true """%(name_dir) def get_mapred_site_xml(self,hostname): return """ mapred.job.tracker %s:9001 """%(hostname) def get_pbs_allocated_nodes(self): pbs_node_file = os.environ.get("PBS_NODEFILE") if pbs_node_file == None: return f = open(pbs_node_file) nodes = f.readlines() for i in nodes: i.strip() f.close() return nodes def configure_hadoop(self): logging.debug("Configure Hadoop") shutil.copytree(os.path.join(HADOOP_HOME, "conf"), self.job_conf_dir) master="localhost" nodes = self.get_pbs_allocated_nodes() if nodes!=None: master = nodes[0].strip() master_file = open(os.path.join(self.job_working_directory, "conf/masters"), "w") master_file.write(master) master_file.close() slave_file = open(os.path.join(self.job_working_directory, "conf/slaves"), "w") slave_file.writelines(nodes) slave_file.close() logging.debug("Hadoop cluster nodes: " + str(nodes)) core_site_file = open(os.path.join(self.job_working_directory, "conf/core-site.xml"), "w") core_site_file.write(self.get_core_site_xml(master)) core_site_file.close() hdfs_site_file = open(os.path.join(self.job_working_directory,"conf/hdfs-site.xml"), "w") hdfs_site_file.write(self.get_hdfs_site_xml(master, self.job_name_dir)) hdfs_site_file.close() mapred_site_file = open(os.path.join(self.job_working_directory,"conf/mapred-site.xml"), "w") mapred_site_file.write(self.get_mapred_site_xml(master)) mapred_site_file.close() def start_hadoop(self): logging.debug("Start Hadoop") if not os.environ.has_key("HADOOP_CONF_DIR") or os.path.exists(os.environ["HADOOP_CONF_DIR"])==False: self.set_env() format_command = os.path.join(HADOOP_HOME, "bin/hadoop") + " --config " + self.job_conf_dir + " namenode -format" logging.debug("Execute: %s"%format_command) os.system(format_command) else: logging.debug("Don't format namenode. Reconnect to existing namenode") start_command = os.path.join(HADOOP_HOME, "bin/start-all.sh") logging.debug("Execute: %s"%start_command) os.system(start_command) print("Hadoop started, please set HADOOP_CONF_DIR to:\nexport HADOOP_CONF_DIR=%s"%self.job_conf_dir) def stop_hadoop(self): logging.debug("Stop Hadoop") self.set_env() stop_command = os.path.join(HADOOP_HOME, "bin/stop-all.sh") logging.debug("Execute: %s"%stop_command) os.system(stop_command) def start(self): if not os.environ.has_key("HADOOP_CONF_DIR") or os.path.exists(os.environ["HADOOP_CONF_DIR"])==False: self.configure_hadoop() else: logging.debug("Existing Hadoop Conf dir? %s"%os.environ["HADOOP_CONF_DIR"]) self.job_conf_dir=os.environ["HADOOP_CONF_DIR"] self.job_log_dir=os.path.join(self.job_conf_dir, "../log") self.job_name_dir=os.path.join(self.job_conf_dir, "../name") self.start_hadoop() def stop(self): if os.environ.has_key("HADOOP_CONF_DIR") and os.path.exists(os.environ["HADOOP_CONF_DIR"])==True: self.job_conf_dir=os.environ["HADOOP_CONF_DIR"] self.job_log_dir=os.path.join(self.job_conf_dir, "../log") self.stop_hadoop() def set_env(self): logging.debug("Export HADOOP_CONF_DIR to %s"%self.job_conf_dir) os.environ["HADOOP_CONF_DIR"]=self.job_conf_dir logging.debug("Export HADOOP_LOG_DIR to %s"%self.job_log_dir) os.environ["HADOOP_LOG_DIR"]=self.job_log_dir ######################################################### # main # ######################################################### if __name__ == "__main__" : signal.signal(signal.SIGALRM, handler) signal.signal(signal.SIGABRT, handler) signal.signal(signal.SIGQUIT, handler) signal.signal(signal.SIGINT, handler) parser = OptionParser() parser.add_option("-s", "--start", action="store_true", dest="start", help="start Hadoop", default=True) parser.add_option("-q", "--quite", action="store_false", dest="start", help="terminate Hadoop") parser.add_option("-c", "--clean", action="store_true", dest="clean", help="clean HDFS datanodes after termination") if not os.path.exists(HADOOP_HOME): logging.debug("Download Hadoop") try: os.makedirs(WORKING_DIRECTORY) except: pass opener = urllib.FancyURLopener({}) opener.retrieve(HADOOP_DOWNLOAD_URL, os.path.join(WORKING_DIRECTORY,"hadoop.tar.gz")); logging.debug("Install Hadoop") os.chdir(WORKING_DIRECTORY) os.system("tar -xzf hadoop.tar.gz") (options, args) = parser.parse_args() hadoop = HadoopBootstrap(WORKING_DIRECTORY) if options.start: hadoop.start() else: hadoop.stop() if options.clean: dir = "/tmp/hadoop-"+os.getlogin() logging.debug("delete: " + dir) shutil.rmtree(dir) sys.exit(0) print "Finished launching of Hadoop Cluster - Sleeping now" f = open(os.path.join(WORKING_DIRECTORY, 'started'), 'w') f.close() while STOP==False: logging.debug("stop: " + str(STOP)) time.sleep(10) hadoop.stop() os.remove(os.path.join(WORKING_DIRECTORY, "started"))