diff --git a/constants.py b/constants.py index 862d840..d919535 100644 --- a/constants.py +++ b/constants.py @@ -285,6 +285,23 @@ class Postgres(object): JVMSTATS = "jvmStats" KEYSPACE_STATS = "keyspaceStats" +# oozie plugin constants +OOZIEHOST = 'ooziehost' +OOZIEPORT = 'oozieport' +JOB_HISTORY_SERVER = 'job_history_server' +JOB_HISTORY_PORT = 'job_history_port' +RESOURCE_MANAGER = 'resource_manager' +RESOURCE_MANAGER_PORT = 'resource_manager_port' +TIMELINE_SERVER = 'timeline_server' +TIMELINE_PORT = 'timeline_port' + +# namenode plugin constants +NAMENODE = "namenode_host" +NAMENODE_PORT = "namenode_port" + # yarn_stat plugin constants +YARN_NODE = "yarn_node" +CLUSTER = "cluster" + # ESA constants class ESA(object): HOST_CONFIG_FILE = '/opt/esa_conf.json' diff --git a/http_request.py b/http_request.py new file mode 100644 index 0000000..965103a --- /dev/null +++ b/http_request.py @@ -0,0 +1,68 @@ +import requests +import collectd +#from requests_kerberos import HTTPKerberosAuth, DISABLED +#from configuration import * +#import logging + +#collectd.= logging.getLogger(__name__) + +kerberos_auth = None + +def kerberos_initialize(): + global kerberos_auth + if not kerberos_auth: + kerberos_auth = HTTPKerberosAuth(mutual_authentication=DISABLED, force_preemptive=True, principal=kerberos['principal']) + +def http_request(address, port, location, user="", pw="", kerberos=False, scheme='https', params=None): + global kerberos_auth + uri = '{0}://{1}:{2}{3}'.format(scheme, address, port, location) + try: + if kerberos: + r = requests.get(uri, auth=kerberos_auth, verify=False, params=params) + elif user == "": + r = requests.get(uri, verify=False, params=params) + else: + r = requests.get(uri, auth=(user, pw), params=params) + r.raise_for_status() + except requests.exceptions.HTTPError as errh: + collectd.error("Http Error: {0}".format(uri)) + return + except requests.exceptions.ConnectionError as errc: + collectd.error("Error Connecting: {0}".format(uri)) + return + except requests.exceptions.Timeout as errt: + collectd.error("Timeout Error: {0}".format(uri)) + return + except requests.exceptions.RequestException as err: + collectd.error("Oops: Some HTTP error {0}".format(uri)) + return + + json_data = r.json() + return json_data + + +def http_post(address, port, location, data, headers): + + uri = 'http://{0}:{1}{2}'.format(address, port, location) + timeout = 30 + + try: + collectd.debug("Data from http_post {0}".format(data)) + r = requests.post(uri, data=data, headers=headers, timeout=timeout) + r.raise_for_status() + except requests.exceptions.HTTPError as errh: + collectd.error("Http Error: {0}".format(uri)) + return + except requests.exceptions.ConnectionError as errc: + collectd.error("Error Connecting: {0}".format(uri)) + return + except requests.exceptions.Timeout as errt: + collectd.error("Timeout Error: {0}".format(uri)) + return + except requests.exceptions.RequestException as err: + collectd.error("Oops: Some HTTP error {0}".format(uri)) + return + + collectd.debug("HTTP post was successful") + json_data = r.json() + return json_data diff --git a/metrics.py b/metrics.py new file mode 100644 index 0000000..bd88227 --- /dev/null +++ b/metrics.py @@ -0,0 +1,225 @@ +from bisect import * +from utilities import * +#from buildData import prepare_task_stats_by_timepoint + +logger = logging.getLogger(__name__) + +reduce_start = 70.0 + +tasks_by_time = { + "numOfDataPoints": 25, + "minimumInterval": 5 # seconds +} + +def prepare_task_stats_by_timepoint(tp_start, tp_end, map_count, reduce_count, job_id, wfaId, wfId, wfName, wfaName): + return { + "wfId": wfId, + "wfaId": wfaId, + "wfName": wfName, + "wfaName": wfaName, + "jobId": job_id, + 'timePeriodStart': tp_start, + 'timePeriodEnd': tp_end, + "mapTaskCount": map_count, + "reduceTaskCount": reduce_count, + 'duration': tp_end - tp_start, + "_plugin": "hadoop", + "_documentType": "taskCounts", + "_tag_appName": "hadoop" + } + +def get_wait_time(job_json, tasks_reduce, tasks_map): + wait_time_total = 0 + if tasks_map: + tasks_map_sorted = sorted(tasks_map, key=lambda k: k['finishTime']) + max_finish_time_map = max(t['finishTime'] for t in tasks_map) + max_map_elapsed_time = max(t['elapsedTime'] for t in tasks_map) + schedule_time = job_json["submitTime"] + wait_time_map = max_finish_time_map - schedule_time - max_map_elapsed_time + wait_time_reduce = 0 + if tasks_reduce: + map_before_reduce = int(math.ceil(len(tasks_map_sorted) * reduce_start / 100)) + if map_before_reduce < len(tasks_map_sorted): + max_finish_time_reduce = max(t['finishTime'] for t in tasks_reduce) + max_reduce_elapsed_time = max(t['elapsedTime'] for t in tasks_reduce) + last_map_before_reduce_finishtime = tasks_map_sorted[map_before_reduce-1]['finishTime'] + wait_time_reduce = max_finish_time_reduce - last_map_before_reduce_finishtime - max_reduce_elapsed_time + if wait_time_reduce < 0: + wait_time_reduce = 0 + + wait_time_total = wait_time_map + wait_time_reduce + + return wait_time_total + + +def percentile(N, percent, key=lambda x: x): + if not N: + return None + k = (len(N) - 1) * percent + f = math.floor(k) + c = math.ceil(k) + if f == c: + return key(N[int(k)]) + d0 = key(N[int(f)]) * (c - k) + d1 = key(N[int(c)]) * (k - f) + return d0 + d1 + + +def find_outliers(nums): + q75 = percentile(nums, 0.75) + q25 = percentile(nums, 0.25) + + iqr = q75 - q25 + iqr15 = iqr * 1.5 + lower = q25 - iqr15 + upper = q75 + iqr15 + return upper + + +def find_stragglers_runtime(tasks_list): + tasks_sorted_elapsed_time = sorted(tasks_list, key=lambda k: k['elapsedTime']) + elapsed_time_list = [] + for e in tasks_sorted_elapsed_time: + elapsed_time_list.append(e['elapsedTime']) + upper = find_outliers(elapsed_time_list) + straggler_index = (bisect(elapsed_time_list, upper)) + for idx, task in enumerate(tasks_list): + if idx < straggler_index: + task["isStraggler"] = 0 + else: + task["isStraggler"] = 1 + + +def calculate_scheduling_delays(workflow, wfa_list): + #logger.debug("workflow {0}".format(workflow)) + #logger.debug("wfa_list {0}".format((wfa_list))) + wf_runtime = get_unix_timestamp(workflow['endTime']) - get_unix_timestamp(workflow['startTime']) + sigma_wfa_runtime = 0 + sigma_job_runtime = 0 + workflow['wfSchedulingDelay'] = 0 + workflow['jobSchedulingDelay'] = 0 + if wfa_list: + for wfadict in wfa_list: + wfa = wfadict['action'] + if wfa['endTime'] and wfa['startTime']: + sigma_job_runtime_wfa_level = 0 + wfa['submitDelay'] = 0 + wfa['jobDelay'] = 0 + sigma_wfa_runtime += wfa['endTime'] - wfa['startTime'] + first_job = None + index = 0 + for job_info in wfadict['yarnJobs']: + if index == 0 and wfa['externalChildID'] and wfa['externalChildID'] != '-': # Launcher job duration encapsulates its children + index += 1 + continue + if not first_job: + first_job = job_info + sigma_job_runtime += job_info['job']['finishTime'] - job_info['job']['startTime'] + sigma_job_runtime_wfa_level += job_info['job']['finishTime'] - job_info['job']['startTime'] + index += 1 + if first_job: + wfa['submitDelay'] = first_job['job']['submitTime'] - wfa['startTime'] + wfa['jobDelay'] = (wfa['endTime'] - wfa['startTime']) - sigma_job_runtime_wfa_level + if sigma_wfa_runtime > 0: + workflow['wfSchedulingDelay'] = wf_runtime - sigma_wfa_runtime + if sigma_job_runtime > 0: + workflow['jobSchedulingDelay'] = wf_runtime - sigma_job_runtime + + +def find_mapper_spill(tasks_list): + + spilled_records_from_tasks = 0 + output_records_from_tasks = 0 + for task in tasks_list: + spilled_records_from_tasks += task["spilledRecords"] + output_records_from_tasks += task["mapOutputRecords"] + + if output_records_from_tasks == 0: + return 0 + else: + mapper_spill = spilled_records_from_tasks / output_records_from_tasks + + return mapper_spill + + +def find_shuffle_ratio(tasks_list): + + elapsed_shuffle_time = 0 + elapsed_time = 0 + elapsed_merge_time = 0 + for task in tasks_list: + elapsed_shuffle_time += task["elapsedShuffleTime"] + elapsed_merge_time += task["elapsedMergeTime"] + elapsed_time += task["elapsedTime"] + + if (elapsed_time - elapsed_shuffle_time - elapsed_merge_time) == 0: + return 0 + else: + shuffle_ratio = (elapsed_shuffle_time * 2) / (elapsed_time - elapsed_shuffle_time - elapsed_merge_time) + + return shuffle_ratio + + +def find_map_speed(tasks_list): + + hdfs_bytes_read = 0 + finish_time = 0 + start_time = 0 + + for task in tasks_list: + hdfs_bytes_read += task["hdfsBytesRead"] + finish_time += task["finishTime"] + start_time += task["startTime"] + + if (finish_time - start_time) == 0: + return 0 + else: + map_speed = hdfs_bytes_read / (finish_time - start_time) + + return map_speed + + +def find_sort_ratio(tasks_list): + + elapsed_shuffle_time = 0 + elapsed_time = 0 + elapsed_merge_time = 0 + for task in tasks_list: + elapsed_shuffle_time += task["elapsedShuffleTime"] + elapsed_merge_time += task["elapsedMergeTime"] + elapsed_time += task["elapsedTime"] + + if (elapsed_time - elapsed_shuffle_time - elapsed_merge_time) == 0: + return 0 + else: + sort_ratio = (elapsed_merge_time * 2) / (elapsed_time - elapsed_shuffle_time - elapsed_merge_time) + + return sort_ratio + +def calculate_taskcount_by_time_points(job_info, task_list, wfaId, wfId, wfName, wfaName): + job_start_time = job_info['startTime'] + job_finish_time = job_info['finishTime'] + 1 + interval_to_plot = int(math.ceil((job_finish_time - job_start_time) / tasks_by_time['numOfDataPoints'])) + interval_to_plot = interval_to_plot if interval_to_plot > tasks_by_time['minimumInterval'] else tasks_by_time['minimumInterval'] + sorted_task_list = sorted(task_list, key=lambda x: x['startTime']) + tpTaskStats = [] + for x in range(job_start_time, job_finish_time, interval_to_plot ): + time_interval_start = x + time_interval_end = x + interval_to_plot if x + interval_to_plot < job_finish_time else job_finish_time + map_count = 0 + reduce_count = 0 + for task in sorted_task_list: + if task['startTime'] and task['finishTime']: + if (task['startTime'] >= time_interval_start and task['startTime'] < time_interval_end) or \ + (task['finishTime'] > time_interval_start and task['finishTime'] <= time_interval_end) or \ + (task['startTime'] < time_interval_start and task['finishTime'] > time_interval_end): + if task['type'] == 'MAP': + map_count += 1 + else: + reduce_count += 1 + stat = prepare_task_stats_by_timepoint(time_interval_start, time_interval_end, map_count, reduce_count, + job_info['jobId'], wfaId, wfId, wfName, wfaName) + tpTaskStats.append(stat) + logger.debug("tpStats {0}".format(tpTaskStats)) + + return tpTaskStats if tpTaskStats else None diff --git a/oozie.py b/oozie.py new file mode 100644 index 0000000..b3cc950 --- /dev/null +++ b/oozie.py @@ -0,0 +1,330 @@ +import collectd +import operator +import signal +import time +from datetime import datetime +import json +import requests +from multiprocessing.dummy import Pool as ThreadPool +from metrics import * +from rest_api import * +from utils import * +from copy import deepcopy + +class Oozie: + def __init__(self): + self.ooziehost = None + self.oozieport = None + self.timeline_server = None + self.timeline_port = None + self.job_history_server = None + self.job_history_port = None + self.resource_manager = None + self.resource_manager_port = None + self.interval = None + self.with_threading = False + self.workflows_processed = 0 + self.wfs_processed = 0 + self.pool = None + self.THREAD_COUNT = 15 + self.url = None + self.out_file = '/var/log/hueristics/hadoop-plugin-test/test.json' + + + def read_config(self, cfg): + for children in cfg.children: + if children.key == 'interval': + self.interval = children.values[0] + elif children.key == 'ooziehost': + self.ooziehost = children.values[0] + elif children.key == 'oozieport': + self.oozieport = children.values[0] + elif children.key == 'job_history_server': + self.job_history_server = children.values[0] + elif children.key == 'job_history_port': + self.job_history_port = children.values[0] + elif children.key == 'resource_manager': + self.resource_manager = children.values[0] + elif children.key == 'resource_manager_port': + self.resource_manager_port = children.values[0] + elif children.key == 'timeline_server': + self.timeline_server = children.values[0] + elif children.key == 'timeline_port': + self.timeline_port = children.values[0] + + def get_unix_timestamp(self, date_text): + dateInput = date_text.rsplit(' ', 1) + utc = timezone(dateInput[1]) + datetimedata = datetime.strptime(dateInput[0], "%a, %d %b %Y %H:%M:%S") + utctimestamp = utc.localize(datetimedata) + #collectd.debug(utctimestamp.tzinfo) + return int(time.mktime(utctimestamp.timetuple())) + + def prepare_workflow(self, workflow): + return { + "wfName": workflow['appName'] if 'appName' in workflow else workflow['wfName'], + "wfId": workflow['id'] if 'id' in workflow else workflow['wfId'] , + "lastModTime": self.get_unix_timestamp(workflow['lastModTime']) if workflow['lastModTime'] is not None else None, + "createdTime": self.get_unix_timestamp(workflow['createdTime']) if workflow['createdTime'] is not None else None, + "startTime": self.get_unix_timestamp(workflow['startTime']) if workflow['startTime'] is not None else 0, + "endTime": self.get_unix_timestamp(workflow['endTime']) if workflow['endTime'] is not None else 0, + "status": workflow['status'], + "time": int(math.floor(time.time())), + "_plugin": "oozie", + "_documentType": "oozieWorkflows", + "_tag_appName": "oozie", + "wfSchedulingDelay": workflow['wfSchedulingDelay'] if "wfSchedulingDelay" in workflow else None, + "jobSchedulingDelay": workflow['jobSchedulingDelay'] if "jobSchedulingDelay" in workflow else None, + "workflowMonitorStatus": "init" + } + + def prepare_workflow_action_data(self, action, yarnJobId, workflowId, workflowName): + return { + "wfId": workflowId, + "wfName": workflowName, + "wfaName": action['name'], + "wfaId": action['id'], + "startTime": self.get_unix_timestamp(action['startTime']) if action['startTime'] is not None else 0, + "endTime": self.get_unix_timestamp(action['endTime']) if action['endTime'] is not None else 0, + "externalId": action['externalId'], + "externalChildID": yarnJobId, + "status": action['status'], + "externalStatus": action['externalStatus'], + "errorCode": action["errorCode"], + "type": action['type'], + "time": int(math.floor(time.time())), + "retries": action['retries'], + "_plugin": "oozie", + "_documentType": "oozieWorkflowActions", + "_tag_appName": "oozie" + } + + + def is_latest_oozie_job(self, lastjobdetails, latestjobdetails): + for workflow in lastjobdetails['workflows']: + if workflow['wfId'] == latestjobdetails['wfId']: + return False + return True + + def read_json(self): + with open("/opt/collectd/plugins/oozieworkflows.json", "r") as fp: + data = json.loads(fp.read()) + if 'workflows' not in data: + data['workflows'] = [] + fp.close() + return data + + def write_json(self, workflow): + data = self.read_json() + with open("/opt/collectd/plugins/oozieworkflows.json", "w") as fp: + data['workflows'].append(workflow) + json.dump(data, fp) + fp.close() + + def change_workflow_status(self, workflow): + workflows = self.read_json() + index = -1 + for i in range(0, len(workflows['workflows'])): + if workflows['workflows'][i]['wfId'] == workflow['wfId']: + index = i + break + if index != -1: + workflows['workflows'][i] = workflow + with open("/opt/collectd/plugins/oozieworkflows.json", "w") as fp: + json.dump(workflows, fp) + fp.close() + + + def processYarnJob(self, yarnJobId, oozieWorkflowId, oozieWorkflowName, oozieWorkflowActionId, oozieWorkflowActionName, post_data): + collectd.debug("Processing yarnJobId %s of workflow %s workflowId: %s ActionId:%s ActionName:%s" %(yarnJobId, oozieWorkflowName, oozieWorkflowId , + oozieWorkflowActionId, oozieWorkflowActionName )) + + job = yarnJobId + tpTaskStats = None + app = yarnJobId.replace("job_", "application_") + app_info = get_app_info(self.timeline_server, self.timeline_port,app,oozieWorkflowName, oozieWorkflowId, oozieWorkflowActionId, oozieWorkflowActionName) + if app_info['type'] != 'MAPREDUCE': + return app_info + job_info = get_job_info(self.job_history_server, self.job_history_port, job,oozieWorkflowName, oozieWorkflowId, oozieWorkflowActionId, oozieWorkflowActionName) + task_info = get_task_info(self.job_history_server, self.job_history_port, job, oozieWorkflowName, oozieWorkflowId, oozieWorkflowActionId, oozieWorkflowActionName) + task_ids = get_task_ids_by_job(self.job_history_server, self.job_history_port, job) + taskattempt_container_info = get_taskattempt_container_info(self.job_history_server, self.job_history_port, job, task_ids, oozieWorkflowName, oozieWorkflowId, oozieWorkflowActionId, oozieWorkflowActionName) + + + if taskattempt_container_info: + tasks_map = [] + tasks_reduce = [] + for task in taskattempt_container_info: + for task_attempt in task: + if task_attempt['type'] == 'MAP': + tasks_map.append(task_attempt) + elif task_attempt['type'] == 'REDUCE': + tasks_reduce.append(task_attempt) + + job_info["waitTime"] = get_wait_time(job_info, tasks_reduce, tasks_map) + #find_stragglers_runtime(tasks_map) + #find_stragglers_runtime(tasks_reduce) + hueristics = {} + #hueristics["mapper_spill"] = find_mapper_spill(tasks_map) + #hueristics["shuffle_ratio"] = find_shuffle_ratio(tasks_reduce) + #hueristics["sort_ratio"] = find_sort_ratio(tasks_reduce) + #hueristics["map_speed"] = find_map_speed(tasks_map) + hueristics["tasks_by_start_time"] = group_tasks_by_start_time(job, tasks_map + tasks_reduce) + hueristics["time"] = int(time.time()) + hueristics["jobId"] = job + hueristics_info = {"hueristics_" + job : hueristics} +# write_json_to_file(hueristics_info, self.out_file) + tpTaskStats = calculate_taskcount_by_time_points(job_info, tasks_map + tasks_reduce,wfId=oozieWorkflowId, + wfaId=oozieWorkflowActionId, wfName=oozieWorkflowName, + wfaName=oozieWorkflowActionName) + + yarnJobInfo = [] + if job_info is not None: + yarnJobInfo.append(job_info) + if app_info is not None: + yarnJobInfo.append(app_info) + if task_info is not None: + yarnJobInfo.extend(task_info) + if taskattempt_container_info is not None: + yarnJobInfo.extend(reduce(operator.concat, taskattempt_container_info)) + if tpTaskStats is not None: + yarnJobInfo.extend(tpTaskStats) + return yarnJobInfo + + + def process_workflow(self, workflow): + res_json = requests.get(self.url+'/job/%s' %workflow['wfId']) + if not res_json.ok: + collectd.error("Unable to get oozie jobs from %s server and status is %s" %(self.ooziehost, res_json.status_code)) + else: + res_data = res_json.json() + collectd.debug("action data is %s" %res_data) + if res_data['status'] == 'SUCCEEDED' or res_data['status'] == 'KILLED': + post_data = {"workflow" : workflow, + "workflowActions": []} + processed_data = [] + for action in res_data['actions']: + actionData = {"action": None, + "yarnJobs": []} + if (action['type'] == ':START:') or (action['type'] == ':END:'): + continue + if action['externalChildIDs']: + childIndex = 0 + for externalChildID in action['externalChildIDs'].split(','): + if childIndex == 0: + workflowActionData = self.prepare_workflow_action_data(action, externalChildID, + workflow['wfId'], workflow['wfName']) + actionData['action'] = workflowActionData + childIndex = childIndex + 1 + yarnJobInfo = self.processYarnJob(externalChildID, workflow['wfId'], workflow['wfName'],action['id'], action['name'], post_data) + if yarnJobInfo: + processed_data.extend(yarnJobInfo) + #actionData['yarnJobs'].append(yarnJobInfo) + else: + collectd.error("Don't have all info for wfaId %s" % action['id']) + else: + workflowActionData = self.prepare_workflow_action_data(action, action['externalId'], workflow['wfId'], workflow['wfName']) + actionData['action'] = workflowActionData + if action['externalId'] and action['externalId'] != '-': + yarnJobInfo = self.processYarnJob(action['externalId'], workflow['wfId'], workflow['wfName'], action['id'], action['name'], post_data) + if yarnJobInfo: +# actionData['yarnJobs'].append(yarnJobInfo) + processed_data.extend(yarnJobInfo) + else: + collectd.error("Don't have all info for wfaId %s" % action['id']) + if actionData['action']: + processed_data.append(actionData['action']) +# if post_data["workflowActions"]: +# res_data['workflowMonitorStatus'] = "processed" + # res_data = self.prepare_workflow(res_data) +# self.change_workflow_status(workflow) +# post_data["workflowActions"].extend() +# collectd.info("======== Processed bulk data is %s" %post_data) +# return post_data + if processed_data: + workflow['workflowMonitorStatus'] = "processed" + self.change_workflow_status(workflow) + processed_data.append(workflow) + return processed_data + elif res_data['status'] == 'SUCCEEDED': + res_data = self.prepare_workflow(res_data) + res_data['workflowMonitorStatus'] = "processed" + self.change_workflow_status(res_data) + return [res_data] + elif res_data['status'] != "RUNNING" and res_data['status'] != "SUSPENDED": + wf_data_to_send = self.prepare_workflow(res_data) + wf_data_to_send['workflowMonitorStatus'] = "processed" + self.change_workflow_status(wf_data_to_send) + return [wf_data_to_send] + + def read_workflows(self): + res_json = requests.get(self.url+'/jobs') + if not res_json.ok: + collectd.error("Unable to get oozie jobs from %s server and status is %s" %(self.ooziehost, res_json.status_code)) + else: + res_json = res_json.json() + if len(res_json['workflows']) == 0: + return None + data = self.read_json() + + result = [] + for workflow in res_json['workflows']: + worklow_data = self.prepare_workflow(workflow) + if not self.is_latest_oozie_job(data, worklow_data): + continue + self.write_json(worklow_data) + + res_data = self.read_json() + if self.with_threading: + r = self.pool.map_async(self.process_workflow,res_data['workflows'], callback=result) + r.wait() + result = r.get() + result = [r for r in result if r] + result = reduce(operator.concat, result) + else: + for workflow in res_data['workflows']: + if workflow['workflowMonitorStatus'] == 'processed': + continue + status_data = self.process_workflow(workflow) + if status_data: + result.extend(status_data) + return result + + + @staticmethod + def add_common_params(oozie_dict, doc_type): + hostname = gethostname() + timestamp = int(round(time.time())) + + oozie_dict[HOSTNAME] = hostname + oozie_dict[TIMESTAMP] = timestamp + oozie_dict[PLUGIN] = 'oozie' + oozie_dict[ACTUALPLUGINTYPE] = 'oozie' + oozie_dict[PLUGINTYPE] = doc_type + #oozie_dict[PLUGIN_INS] = doc_type + + @staticmethod + def dispatch_data(oozie_dict): + collectd.info("Plugin Oozie: Values: " + json.dumps(oozie_dict)) + dispatch(oozie_dict) + + def collect_data(self): + if self.with_threading: + self.pool = ThreadPool(self.THREAD_COUNT) + self.url = 'http://%s:%s/oozie/v1' %(self.ooziehost, self.oozieport) + oozie_dicts = self.read_workflows() + for oozie_dict in oozie_dicts: + collectd.info("Collectd info is %s" %oozie_dict) + self.add_common_params(oozie_dict, oozie_dict['_documentType']) + self.dispatch_data(deepcopy(oozie_dict)) + + def read(self): + self.collect_data() + + def read_temp(self): + collectd.unregister_read(self.read_temp) + collectd.register_read(self.read, interval=int(self.interval)) + +oozieinstance = Oozie() +collectd.register_config(oozieinstance.read_config) +collectd.register_read(oozieinstance.read_temp) diff --git a/rest_api.py b/rest_api.py new file mode 100644 index 0000000..3952101 --- /dev/null +++ b/rest_api.py @@ -0,0 +1,307 @@ +from http_request import * +import time +from utilities import * +import traceback + + +def get_job_info(host, port, job_id, wfName, wfId, wfaId, wfaName): + + location = host + port = port + path = "/ws/v1/history/mapreduce/jobs/{0}".format(job_id) + json_resp = http_request(location, port, path, scheme='http') + if json_resp is None: + return None + job_info = json_resp['job'] + + job_info['wfName'] = wfName + job_info['wfId'] = wfId + job_info['wfaId'] = wfaId + job_info['wfaName'] = wfaName + job_info['_plugin'] = "oozie" + job_info['_documentType'] = "jobStats" + job_info['_tag_appName'] = "oozie" + job_info['time'] = int(time.time()) + job_info['startTime'] = int(job_info['startTime'] / 1000) + job_info['finishTime'] = int(job_info['finishTime'] / 1000) + job_info['submitTime'] = int(job_info['submitTime'] / 1000) + job_info['avgMapTime'] = int(job_info['avgMapTime'] / 1000) + job_info['avgReduceTime'] = int(job_info['avgReduceTime'] / 1000) + job_info['avgShuffleTime'] = int(job_info['avgShuffleTime'] / 1000) + job_info['avgMergeTime'] = int(job_info['avgMergeTime'] / 1000) + + job_info['jobId'] = job_info.pop('id') + + path_counters = "/ws/v1/history/mapreduce/jobs/{0}/counters".format(job_id) + + json_resp_counters = http_request(location, port, path_counters, scheme='http') + if json_resp_counters is None: + return None + + if json_resp_counters['jobCounters'].get('counterGroup') is not None: + job_counters = json_resp_counters['jobCounters']['counterGroup'] + for jc in job_counters: + counter_list = jc["counter"] + for counter in counter_list: + job_info[convert_camelcase(counter["name"], "_") + "Total"] = counter["totalCounterValue"] + job_info[convert_camelcase(counter["name"], "_") + "Reduce"] = counter["reduceCounterValue"] + job_info[convert_camelcase(counter["name"], "_") + "Map"] = counter["mapCounterValue"] + return job_info + + +def get_job_counters(host, port, job_id): + location = host + port = port + path = "/ws/v1/history/mapreduce/jobs/{0}/counters".format(job_id) + json_resp = http_request(location, port, path, scheme='http') + if json_resp is None: + return None + job_counters = json_resp['jobCounters']['counterGroup'] + + job_counters_json = {} + job_counters_json['_plugin'] = "oozie" + job_counters_json['_documentType'] = "jobStats" + job_counters_json['_tag_appName'] = "oozie" + job_counters_json['time'] = int(time.time()) + + for jc in job_counters: + counter_list = jc["counter"] + for counter in counter_list: + job_counters_json[convert_camelcase(counter["name"], "_") + "Total"] = counter["totalCounterValue"] + job_counters_json[convert_camelcase(counter["name"], "_") + "Reduce"] = counter["reduceCounterValue"] + job_counters_json[convert_camelcase(counter["name"], "_") + "Map"] = counter["mapCounterValue"] + + return job_counters_json + + +def get_task_info(host, port, job_id, wfName, wfId, wfaId, wfaName): + location = host + port = port + path = '/ws/v1/history/mapreduce/jobs/{0}'.format(job_id) + json_resp_ts = http_request(location, port, path, scheme='http') + if json_resp_ts is None: + return None + + path = '/ws/v1/history/mapreduce/jobs/{0}/tasks'.format(job_id) + json_resp_tasks = http_request(location, port, path, scheme='http') + if json_resp_tasks is None: + return None + task_list = json_resp_tasks['tasks']['task'] + task_document_list = [] + for task in task_list: + task['wfName'] = wfName + task['wfId'] = wfId + task['wfaId'] = wfaId + task['wfaName'] = wfaName + task['_plugin'] = "oozie" + task['_documentType'] = "taskStats" + task['jobId'] = job_id + task['name'] = json_resp_ts['job']['name'] + task['_tag_appName'] = "oozie" + task['taskId'] = task.pop('id') + task['time'] = int(time.time()) + task['startTime'] = int(task['startTime'] / 1000) + task['finishTime'] = int(task['finishTime'] / 1000) + task['elapsedTime'] = int(task['elapsedTime'] / 1000) + task_document_list.append(task) + + return task_document_list + + +def get_app_info(host, port,app_id, wfName, wfId, wfaId, wfaName): + location = host + port = port + path = '/ws/v1/applicationhistory/apps/{0}/'.format(app_id) + app_info = http_request(location, port, path, scheme='http') + if app_info is None: + return None + + app_info['wfName'] = wfName + app_info['wfId'] = wfId + app_info['wfaId'] = wfaId + app_info['wfaName'] = wfaName + app_info['_plugin'] = "oozie" + app_info['_documentType'] = "appStats" + app_info['_tag_appName'] = "oozie" + app_info['time'] = int(time.time()) + # Convert times to epoch seconds from ms + app_info['startedTime'] = int(app_info['startedTime'] / 1000) + app_info['finishedTime'] = int(app_info['finishedTime'] / 1000) + app_info['elapsedTime'] = int(app_info['elapsedTime'] / 1000) + app_info['submittedTime'] = int(app_info['submittedTime'] / 1000) + + return app_info + + + +def get_task_ids_by_job(host, port, job_id): + location = host + port = port + path = '/ws/v1/history/mapreduce/jobs/{0}/tasks'.format(job_id) + json_resp_tasks = http_request(location, port, path, scheme='http') + if json_resp_tasks is None: + return None + task_list = json_resp_tasks['tasks']['task'] + task_id_list = [] + for task in task_list: + task_id_list.append(task['id']) + + return task_id_list + + +def get_container_info(host, port, job_id): + + app_id = job_id.replace("job", "application") + location = host + port = port + path = '/ws/v1/applicationhistory/apps/{0}/'.format(app_id) + json_resp_ts = http_request(location, port, path, scheme='http') + if json_resp_ts is None: + return None + + app_attempt = json_resp_ts['currentAppAttemptId'] + path = '/ws/v1/applicationhistory/apps/{0}/appattempts/'.format(app_id) + app_attempts = http_request(location, port, path, scheme='http') + container_attempt = [] + for attempt in app_attempts['appAttempt']: + path = '/ws/v1/applicationhistory/apps/{0}/appattempts/{1}/containers'.format(app_id, attempt['appAttemptId']) + json_resp_containers = http_request(location, port, path, scheme='http') + if json_resp_containers is None: + return None + container_list = json_resp_containers['container'] + container_attempt.append(container_list) + + containers_flat = [l for k in container_attempt for l in k] + + return containers_flat + + +def get_taskattempt_container_info(host, port, job_id, task_ids, wfName, wfId, wfaId, wfaName): + + try: + if task_ids is None: + return None + location = host + port = port + + #containers_list = get_container_info(job_id) + containers_list = [] + task_attempt_document_job = [] + for task in task_ids: + path = '/ws/v1/history/mapreduce/jobs/{0}/tasks/{1}/attempts'.format(job_id, task) + json_resp_tasks = http_request(location, port, path, scheme='http') + if json_resp_tasks is None: + return None + task_attempt_list = json_resp_tasks['taskAttempts']['taskAttempt'] + task_attempt_document = [] + for task_attempt in task_attempt_list: + task_attempt['wfName'] = wfName + task_attempt['wfId'] = wfId + task_attempt['wfaId'] = wfaId + task_attempt['wfaName'] = wfaName + task_attempt['_plugin'] = "oozie" + task_attempt['_documentType'] = 'taskAttemptStat' + task_attempt['_tag_appName'] = "oozie" + task_attempt['jobId'] = job_id + task_attempt['taskId'] = task + + task_attempt['time'] = int(time.time()) + task_attempt['startTime'] = int(task_attempt['startTime'] / 1000) + task_attempt['finishTime'] = int(task_attempt['finishTime'] / 1000) + task_attempt['elapsedTime'] = int(task_attempt['elapsedTime'] / 1000) + task_attempt['taskAttemptId'] = task_attempt.pop('id') + task_attempt['containerId'] = task_attempt.pop('assignedContainerId') + if 'shuffleFinishTime' in task_attempt: + task_attempt['shuffleFinishTime'] = int(task_attempt['shuffleFinishTime'] / 1000) + if 'mergeFinishTime' in task_attempt: + task_attempt['mergeFinishTime'] = int(task_attempt['mergeFinishTime'] / 1000) + if 'elapsedShuffleTime' in task_attempt: + task_attempt['elapsedShuffleTime'] = int(task_attempt['elapsedShuffleTime'] / 1000) + if 'elapsedMergeTime' in task_attempt: + task_attempt['elapsedMergeTime'] = int(task_attempt['elapsedMergeTime'] / 1000) + if 'elapsedReduceTime' in task_attempt: + task_attempt['elapsedReduceTime'] = int(task_attempt['elapsedReduceTime'] / 1000) + + # Find the container from container app list and merge + for container in containers_list: + if container['containerId'] == task_attempt['containerId']: + task_attempt['allocatedMB'] = container['allocatedMB'] + task_attempt['allocatedVCores'] = container['allocatedVCores'] + break + + # Merge the counters document + + path = '/ws/v1/history/mapreduce/jobs/{0}/tasks/{1}/attempts/{2}/counters'.format(job_id, task, + task_attempt['taskAttemptId']) + json_resp_tasks1 = http_request(location, port, path, scheme='http') + task_attempt_counter_group = json_resp_tasks1['jobTaskAttemptCounters']["taskAttemptCounterGroup"] + task_attempt_counter = {} + for group in task_attempt_counter_group: + counter_list = group["counter"] + for counter in counter_list: + task_attempt_counter[convert_camelcase(counter["name"], "_")] = counter["value"] + + task_attempt.update(task_attempt_counter) + + task_attempt_document.append(task_attempt) + + task_attempt_document_job.append(task_attempt_document) + + return task_attempt_document_job + except Exception as e: + logger.debug('Unable to get task details => ' + traceback.format_exc().splitlines()[-1]) + return None + + +def get_containers_node(host, port, app_id): + + location = host + port = port + path = '/ws/v1/cluster/nodes'.format(app_id) + nodes_json = http_request(location, port, path, scheme='http') + if nodes_json is None: + return None + + nodes_list = nodes_json["nodes"]["node"] + return nodes_list + + +def get_num_node_to_tasks(job_id, task_list): + + node_to_tasks = {"jobId": job_id, "tasksTotal": len(task_list)} + for task in task_list: + job_type = task["type"] + node = task["nodeHttpAddress"] + # node_to_tasks[node] = {"maps":0, "reduces":0} + if job_type == "MAP": + if node not in node_to_tasks: + node_to_tasks[node] = {'maps': 1, "reduces": 0} + else: + node_to_tasks[node]['maps'] += 1 + elif job_type == "REDUCE": + if node not in node_to_tasks: + node_to_tasks[node] = {'maps': 0, "reduces": 1} + else: + node_to_tasks[node]['reduces'] += 1 + + return node_to_tasks + + +def group_tasks_by_start_time(job_id, task_list): + + tasks_by_start_time = {"jobId": job_id} + + for task in task_list: + job_type = task["type"] + if job_type == "MAP": + if task["startTime"] not in tasks_by_start_time: + tasks_by_start_time[task["startTime"]] = {"maps": 1, "reduces": 0} + else: + tasks_by_start_time[task["startTime"]]["maps"] += 1 + elif job_type == "REDUCE": + if task["startTime"] not in tasks_by_start_time: + tasks_by_start_time[task["startTime"]] = {"maps": 0, "reduces": 1} + else: + tasks_by_start_time[task["startTime"]]["reduces"] += 1 + + return tasks_by_start_time diff --git a/utilities.py b/utilities.py new file mode 100644 index 0000000..518c769 --- /dev/null +++ b/utilities.py @@ -0,0 +1,36 @@ +from pytz import timezone +import _strptime +from datetime import datetime +import json +import os +import time +import logging +import math + + +logger = logging.getLogger("__name__") + + +def get_unix_timestamp(date_text): + #logger.debug("date_text %s" % date_text) + dateInput = date_text.rsplit(' ', 1) + utc = timezone(dateInput[1]) + datetimedata = datetime.strptime(dateInput[0], "%a, %d %b %Y %H:%M:%S") + utctimestamp = utc.localize(datetimedata) + #logger.debug(utctimestamp.tzinfo) + return int(time.mktime(utctimestamp.timetuple())) + + +def convert_camelcase(str_to_convert, separator): + + c = ''.join(x for x in str_to_convert.title() if not x == separator) + c = c[0].lower() + c[1::] + return c + + +def write_json_to_file(data, outfile): + if os.path.exists(outfile): + with open(outfile, 'a') as ofile: + json.dump(data, ofile) + ofile.write("\n") + diff --git a/yarn.py b/yarn.py new file mode 100644 index 0000000..fde522e --- /dev/null +++ b/yarn.py @@ -0,0 +1,114 @@ +import time +from copy import deepcopy +import collectd +from constants import * # pylint: disable=W +from utils import * # pylint: disable=W +from http_request import * # pylint: disable=W + +class YarnStats: + def __init__(self): + """Plugin object will be created only once and \ + collects yarn statistics info every interval.""" + self.resource_manager = None + self.yarn_node = None + + def read_config(self, cfg): + """Initializes variables from conf files.""" + for children in cfg.children: + if children.key == INTERVAL: + self.interval = children.values[0] + elif children.key == YARN_NODE: + self.yarn_node = children.values[0] + elif children.key == RESOURCE_MANAGER_PORT: + self.resource_manager = children.values[0] + + def remove_dot(self, doc, field): + """Function to remove dots in the field""" + new_field = '_' + field.split('.')[0] + '_' + field.split('.')[1].lower() + doc[new_field] = doc.pop(field) + + + def get_yarn_stats(self): + """Function to get yarn statistics""" + location = self.yarn_node + port = self.resource_manager + path = "/jmx?qry=Hadoop:service=ResourceManager,name={}".format('JvmMetrics') + json_yarn_node = http_request(location, port, path, scheme='http') + if json_yarn_node is not None: + json_yarn_node = json_yarn_node['beans'] + json_yarn_node[0]['time'] = int(time.time()) + json_yarn_node[0]['_documentType'] = "yarnStats" + 'JvmMetrics' + else: + return [] + hostname = json_yarn_node[0]['tag.Hostname'] + + for name in ['RpcActivityForPort8031', 'RpcActivityForPort8032', \ + 'RpcActivityForPort8033', 'RpcActivityForPort8025', \ + 'RpcDetailedActivityForPort8050', 'QueueMetrics,q0=root', 'ClusterMetrics']: + path = "/jmx?qry=Hadoop:service=ResourceManager,name={}".format(name) + json_doc = http_request(location, port, path, scheme='http') + if json_doc is None: + continue + try: + if json_doc['beans'] == []: + continue + doc = json_doc['beans'][0] + except KeyError as error: + collectd.error("Plugin yarn_stats: Error ", error) + return None + if 'tag.Hostname' not in doc: + doc['tag.Hostname'] = hostname + doc['_tag_hostname'] = doc.pop('tag.Hostname') + doc['time'] = int(time.time()) + if 'Rpc' in name: + doc['_documentType'] = "yarnStats" + 'RpcActivity' + else: + doc['_documentType'] = "yarnStats" + name.split(',')[0] + for field in doc.keys(): + if '.' in field: + self.remove_dot(doc, field) + + json_yarn_node.append(doc) + return json_yarn_node + + @staticmethod + def add_common_params(namenode_dic, doc_type): + """Adds TIMESTAMP, PLUGIN, PLUGIN_INS to dictionary.""" + hostname = gethostname() + timestamp = int(round(time.time())) + + namenode_dic[HOSTNAME] = hostname + namenode_dic[TIMESTAMP] = timestamp + namenode_dic[PLUGIN] = 'yarn' + namenode_dic[ACTUALPLUGINTYPE] = 'yarn' + namenode_dic[PLUGINTYPE] = doc_type + + @staticmethod + def dispatch_data(doc): + """Dispatches dictionary to collectd.""" + collectd.info("Plugin Yarn_Stats: Values: %s" %(doc)) # pylint: disable=E1101 + dispatch(doc) + + + def collect_data(self): + """Collects all data.""" + namenode_dics = self.get_yarn_stats() + for doc in namenode_dics: + self.add_common_params(doc, doc['_documentType']) + self.dispatch_data(deepcopy(doc)) + + def read(self): + self.collect_data() + + def read_temp(self): + """ + Collectd first calls register_read. At that time default interval is taken, + hence temporary function is made to call, the read callback is unregistered + and read() is called again with interval obtained from conf by register_config callback. + """ + collectd.unregister_read(self.read_temp) # pylint: disable=E1101 + collectd.register_read(self.read, interval=int(self.interval)) # pylint: disable=E1101 + +namenodeinstance = YarnStats() +collectd.register_config(namenodeinstance.read_config) # pylint: disable=E1101 +collectd.register_read(namenodeinstance.read_temp) # pylint: disable=E1101