Linux ip-148-66-134-25.ip.secureserver.net 3.10.0-1160.119.1.el7.tuxcare.els10.x86_64 #1 SMP Fri Oct 11 21:40:41 UTC 2024 x86_64
Apache
: 148.66.134.25 | : 18.188.110.150
66 Domain
8.0.30
amvm
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
usr /
lib /
fm-agent /
library /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
__init__.py
0
B
-rw-r--r--
agent.py
96.8
KB
-rw-r--r--
agent_exceptions.py
110
B
-rw-r--r--
agent_util.py
8.58
KB
-rw-r--r--
aggregator.py
14.89
KB
-rw-r--r--
anomaly.py
2.19
KB
-rw-r--r--
blacklister.py
809
B
-rw-r--r--
container_discovery.py
3.3
KB
-rw-r--r--
display.py
2.06
KB
-rw-r--r--
forticlient_helper.py
2.59
KB
-rw-r--r--
inspector.py
15.7
KB
-rw-r--r--
iperf3.py
2.12
KB
-rw-r--r--
log_matcher.py
4.27
KB
-rw-r--r--
maintenance.py
3.61
KB
-rw-r--r--
pickle_database.py
1.28
KB
-rw-r--r--
plugin_driver.py
4.78
KB
-rw-r--r--
plugin_manager.py
11.04
KB
-rw-r--r--
process_manager.py
851
B
-rw-r--r--
progress_printer.py
837
B
-rw-r--r--
result_queue.py
1.99
KB
-rw-r--r--
schedule.py
3.19
KB
-rw-r--r--
threshold.py
1.5
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : agent.py
from agent_util import execute_command, which from datetime import datetime, timedelta from inspector import Inspector, get_fqdn, get_server_name from agent_exceptions import NoAgentSectionHeaderException, NoManifestFileException from process_manager import ProcessManager from os.path import basename, exists, isdir, isfile, join from pickle_database import PickleDatabase from plugin_manager import PluginManager from pprint import pprint from progress_printer import ProgressPrinter from result_queue import ResultQueue from schedule import Schedule from sys import exit # In case of Python3 try: import StringIO except: import io as StringIO import aggregator import calendar import container_discovery import csv import difflib import display import fcntl import p_importlib import logging import logging.handlers import optparse import os import random import re import subprocess import sys import tempfile import time import traceback import types import socket from blacklister import PluginBlacklister try: import six except: # Legacy support for Python 2.4 class Six: PY2 = True six = Six() # In case of python 3 try: import ConfigParser as configparser except: import configparser try: import json except ImportError: try: import simplejson as json # it's possible that we may not need json for the action that we're taking. # for example, for the rpm post install script, on a python version that # doesn't have json, we'll get this far in the code. but the post # install doesn't use json, so we're fine except ImportError: json = None # Import a SHA function, either from hashlib for newer Python's or sha for older try: import hashlib sha_func = hashlib.sha1 except: import sha sha_func = sha.new # Backport subprocess.check_output for Python versions < 2.7 # Adapted from http://stackoverflow.com/questions/4814970/subprocess-check-output-doesnt-seem-to-exist-python-2-6-5 if "check_output" not in dir( subprocess ): # duck punch it in! def f(*popenargs, **kwargs): if 'stdout' in kwargs: raise ValueError('stdout argument not allowed, it will be overridden.') process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) output, unused_err = process.communicate() retcode = process.poll() return output subprocess.check_output = f try: # Python 2.x import urlparse except: import urllib.parse as urlparse import urllib.request as urlrequest class Agent(object): CUSTOM = "custom" DEFAULT_LOG_LEVEL = 'INFO' MAX_IMPORT_FILES = 20 def safe_to_start_agent(self, timeout=2, sleep_time=10, counter=3): "Check to see if it's safe to start up the agent" log = logging.getLogger("lock_mgr") # Safe if there are no other instances running if not os.path.exists(self.pid_file): self.log.debug("No existing PID file found, proceeding to run") return True # There's an existing PID file, so let's see if it's still active try: pid, timestamp = open(self.pid_file).read().strip().split(':') pid = int(pid) timestamp = int(timestamp) except: # If we couldn't read it, assume that the other instance just exited - should be safe to continue self.log.critical("Error reading existing PID file: %s" % traceback.format_exc()) return True # See if the process is still running try: os.getpgid(pid) except OSError: # It's exited, safe to proceed return True try: import pwd username = pwd.getpwuid(os.stat("/proc/%d" % pid).st_uid)[0] psout = execute_command("ps -o cmd= %d" % pid)[1].lower() if username != self.user or "python" not in psout or ("%s_agent" % self.brand) not in psout: self.remove_pid_file() return True except: pass # Process is running, see how old it is if timeout and (time.time() - timestamp) / 60. > timeout: self.log.critical("Found stale agent process %s - killing" % pid) # Other process is too old, kill it off and start a new one os.kill(pid, 9) return True # Checking if the process is to uninstall, in which case, kill the running process. parser = optparse.OptionParser() options, args = self.parse_arguments(parser) if options.uninstall and self.user != 'root': self.log.critical('Uninstalling. Killing all process from the username %s ' % self.user) manager = ProcessManager() pids = manager.filter_non_pid_process(os.listdir('/proc')) pids = manager.get_process_from_user(pids, self.user) self.log.critical('Found pids %s ' % ' '.join(pids) ) for pid in pids: os.kill(int(pid), 9) return True # Other process should still be running, we bail for now if counter != 0: self.current_agent_delay += 10 counter -= 1 self.log.critical("Found existing agent process %s, sleeping for %s and checking %s more times if safe to start." % (pid, sleep_time, counter)) time.sleep(sleep_time) return self.safe_to_start_agent(timeout, counter=counter) else: self.log.critical("Found existing agent process %s, exiting to wait for it to finish" % pid) return False def write_pid_file(self): "Create a new PID file to track our instance" pid = os.getpid() now = int(time.time()) f = open(self.pid_file, 'w') f.write("%s:%s" % (pid, now)) def remove_pid_file(self): "Remove an old PID file to clean up on the way out" # Need to check to see if it exists to avoid a problem on package uninstall if os.path.exists(self.pid_file): os.remove(self.pid_file) # removes the agent from the system def uninstall(self, aggregator_client, remove_instance=False): indent = 1 pp = ProgressPrinter("Notifying %s of uninstall" % self.brand, indent=indent) success = aggregator_client.notify_of_uninstall(remove_instance) if success: pp.finish() else: pp.finish("ERROR CONNECTING") # Remove logging and DB directories. We'll leave CUSTOM_PLUGIN_DIR in tact in case they're # uninstalling and reinstalling. pp = ProgressPrinter("Removing %r directory" % self.log_dir, indent=indent) os.system("rm -rf %s %s %s" % (self.db_dir, self.log_dir, self.config_dir)) pp.finish() indent = 1 ProgressPrinter("\n\nUninstalling %s\n" % self.pkg_dir, section=True) pp.finish() print(("\nUninstall of %s complete\n" % self.pkg_dir)) def get_manifest(self): ''' Get the manifest configuration if it exists. Also, throw a deprecation warning if the the manifest does not conform to the new-style format (It must have an [agent] section heading). ''' manifest = configparser.ConfigParser() try: manifest_file = manifest.read(self.manifest_file) if not manifest_file: raise NoManifestFileException('No manifest file found') if not manifest.has_section('agent'): raise NoAgentSectionHeaderException( 'Using a manifest file without the section heading ' '"[agent]" is deprecated; please add this heading to ' 'the file. Example:' ''' [agent] customer_key = customerkey server_group = 123 ''' ) except (configparser.MissingSectionHeaderError, NoAgentSectionHeaderException): self.log.warn(str(traceback.format_exc())) if sys.version_info[0] == 3: amended_manifest_file = StringIO.StringIO( '[agent]\n' + open(self.manifest_file, 'r').read() ) else: amended_manifest_file = StringIO.StringIO( '[agent]\n' + open(self.manifest_file, 'r').read().decode('utf-8') ) manifest.readfp(amended_manifest_file) except NoManifestFileException: self.log.info(str(traceback.format_exc())) return manifest def write_config(self, manifest): ''' Create/update the config file with the settings from the manifest. Return the config. ''' new_config = configparser.ConfigParser() # Get the existing config file (if it exists) for creating a diff. See # below. old_config_lines = None if os.path.exists(self.config_file): self.log.info('Existing config file found') old_config_file = open(self.config_file, 'rb') old_config_lines = old_config_file.readlines() old_config_file.close() # Copy old config settings into the new config old_config = configparser.ConfigParser() old_config.read(self.config_file) new_config = self.copy_config_settings(old_config, new_config) # Copy the manifest settings into the new config new_config = self.copy_config_settings(manifest, new_config) # Ensure the required settings are set. if not new_config.has_section('agent'): new_config.add_section('agent') if not new_config.has_option('agent', 'aggregator_url'): new_config.set('agent', 'aggregator_url', self.agg_url) new_config.set('agent', 'version', self.version) if 'plugin_blacklist' in new_config.options('agent'): original_plugins = new_config.get('agent', 'plugin_blacklist') else: original_plugins = [] updated_plugins = self._blacklister.update_list(original_plugins) if updated_plugins: new_config.set('agent', 'plugin_blacklist', updated_plugins) proxies = urlrequest.getproxies() if not new_config.has_section('agent_proxy') and proxies: agg_url = new_config.get('agent', 'aggregator_url') try: agg_url_option = urlparse.urlparse(agg_url) if agg_url_option.scheme: agg_hostname = agg_url_option.hostname else: agg_hostname = agg_url_option.path if not urlrequest.proxy_bypass(agg_hostname): new_config.add_section('agent_proxy') for key in ['https', 'http']: p_url = proxies.get(key, None) if p_url is not None: new_config.set('agent_proxy', key, p_url.strip('/')) except: err = sys.exc_info()[1] error = str(err) self.log.error('Install proxy error: {}'.format(error)) new_config_file = open(self.config_file, 'w') new_config.write(new_config_file) new_config_file.close() os.system("chmod 640 %s" % self.config_file) if old_config_lines is not None: # Create a diff of the old config vs new config. differ = difflib.Differ() diff_lines = differ.compare(old_config_lines, open(self.config_file, 'r').readlines()) diff_lines = list(diff_lines) changes = [line for line in diff_lines if line.startswith('+ ') or line.startswith('- ')] if len(changes): self.log.info('Config file overwritten') self.log.debug('Config diff:\n%s', ''.join(diff_lines)) else: self.log.info('No change to config file') else: self.log.info('Created new config file: %s', self.config_file) return new_config def copy_config_settings(self, original, destination): """ Copy settings from the original to the destination, overwriting destination's settings if they already exist. """ for section in original.sections(): if not destination.has_section(section): destination.add_section(section) for option, value in original.items(section): destination.set(section, option, value) return destination def install(self, agg_url, version, server_key, customer_key): self.log.info('Begining installation') if self.is_installed: print('Agent already installed') self.log.info('Agent already installed') return # Make dirs for logging, DB, config, and plugins. dirs = (self.log_dir, self.db_dir, self.config_dir, self.custom_plugin_dir ) os.system('mkdir -p %s %s %s %s' % dirs) self.log.info('Created directories: %s %s %s %s' % dirs) # Create a new config from the manifest (if it exists). manifest = self.get_manifest() config = self.write_config(manifest) proxy_config = {} if config.has_section('agent_proxy'): proxy_config = config['agent_proxy'] aggregator_client = aggregator.Client( agg_url, version, server_key, customer_key, proxy_config=proxy_config ) # Check for a custom aggregator URL, and set it in the client if present try: agg_url = config.get("agent", "aggregator_url") print(("Using manifest file aggregator for initial handshake: %s" % agg_url)) self.log.info("Using manifest file aggregator for initial handshake: %s" % agg_url) aggregator_client.agg_url = agg_url except: pass pp = ProgressPrinter('\nHandshaking with %s servers' % self.brand, indent=1) agent_settings = dict((option, value.strip('\'"')) for option, value in config.items('agent')) if config.has_section('agent_proxy'): aggregator_client.proxy_config = config['agent_proxy'] if config.has_section('attributes'): server_attributes = dict((option, value) for option, value in config.items('attributes')) else: server_attributes = {} try: success, server_key, found_server, error, log_level = aggregator_client.handshake(self.get_all_ips(), agent_settings, server_attributes) except: print("\n\nThere was an error in the initial handshake with the aggregator, please") print("check your aggregator URL, and ensure you have connectivity to retrieve:\n") for url in agg_url.split(","): print((" %s\n" % os.path.join(url, "v2/hello"))) self.log.error("Error in initial handshake: %s" % traceback.format_exc()) sys.exit() if not server_key or not found_server: print("Handshake failed: %s" % error) self.log.error("Handshake failed: %s" % error) sys.exit() self.log.debug('%s, %s, %s, %s, %s' % (success, server_key, found_server, error, log_level)) if log_level: self.db['log_level'] = log_level # Install remote countermeasures plugins, if specfied if "enable_countermeasures" in config.options("agent") and \ config.get("agent", "enable_countermeasures").lower() == "true" and \ "countermeasures_remote_plugins" in config.options("agent"): for url in config.get("agent", "countermeasures_remote_plugins").split(","): cmd = "%s %s/countermeasure.py install_plugins --url %s" % (sys.executable, self.bin_dir, url.strip()) os.system(cmd) if success: pp.finish() else: self.log.critical('Installation failed:\n%s', error) pp.finish('ERROR CONNECTING: %s' % error) if success and server_key: config.set('agent', 'server_key', server_key) config.write(open(self.config_file, 'w')) if found_server: print(("""Installation of %s complete. Your server will now sync automatically with the %s ControlPanel. """ % (self.pkg_dir, self.brand))) self.log.info('Agent will automatically sync with aggregator') else: padding = int(80/2 - (len(server_key)/2)) server_key = (" " * padding) + server_key print((""" Installation of %s complete. Please copy and paste the following server key into the %s ControlPanel for your server: %s""" % (self.pkg_dir, self.brand, server_key))) self.log.warn('The server key must be manually entered into the ' 'Control Panel before agent will begin syncing') else: print((""" Installation of %s had an error (%s). The %s is installed but it cannot sync correctly. Please contact %s and send them the log file at %s """ % (self.pkg_dir, error, self.pkg_dir, self.brand, self.log_file))) self.log.critical('Aggregator sync failed:\n%s', error) self.migrate_config() # This is used for collecting all the IPs associated with the machine, to be # passed to the aggregator through aggregator.Client.handshake(). The # aggregator will then check all of these IPs in sequence to try to find a # matching server. def get_all_ips(self): ips = [] ifconfig_path = which("ifconfig") ifconfig_cmd = ifconfig_path # special logig for solaris b/c if the way ifconfig syntax is changed if 'sunos' in sys.platform or 'aix' in sys.platform: ifconfig_cmd = ifconfig_path + ' -a' if 'hp-ux' in sys.platform: netstat = which("netstat") if netstat: code, output = execute_command("%s -in" % netstat) if code == 0: for l in output.split('\n'): if l.lower().startswith('name') or not l or l == '': continue line = l.split() ips.append(line[3]) elif ifconfig_path and 'hp-ux' not in sys.platform: code, output = execute_command(ifconfig_cmd) if code == 0: if sys.platform in ('freebsd', 'darwin'): ips = re.findall(r'inet6? (.+?)\s', output) ips = [ip.strip().split("%")[0] for ip in ips] else: ips = [x.strip('addr:') for x in re.findall('inet6? (.+?)\s', output)] else: ip_addr_path = which("ip") code, output = execute_command("%s addr show" % ip_addr_path) ips = [x for x in re.findall(r'inet6? (.+?)\s', output)] ips = [x for x in ips if x] # Remove any stray whitespace and CIDR notation from IPv6 addresses # AIX reports an inet6 address like '::1%1/64' - account for that. ips = [ip.strip().split("/")[0].split('%')[0] for ip in ips] if "1" in ips: ips[ips.index("1")] = "::1" # If that didn't work, get the IP address by making an outbound # connection with the aggregator. if not ips: self.log.warning('Unable to retrieve IP address(es) locally, ' 'contacting aggregator') aggregator_client = aggregator.Client(self.agg_url, self.version, proxy_config=self.proxy_config) try: ips = [aggregator_client.get_local_ip()] except Exception as e: self.log.error('IP address lookup failure: {}'.format(e)) ips = [] if not ips: self.log.error('Unable to determine IP address(es)') else: self.log.debug('IP addresses: %s', ips) return ips def get_old_style_config_properties(self, manfile): # Return with no error if the manifest file doesn't exist if not os.path.exists(manfile): return {} try: mf = open(manfile).read().strip().split('\n') return dict([list(map(str.strip, line.split("="))) for line in mf]) except: print("Error reading manifest file") return {} def _open_file(self, fname, mode='r+'): ofile = open(fname, mode) # Acquire lock locked = True for i in range(10): try: fcntl.flock(ofile, fcntl.LOCK_EX | fcntl.LOCK_NB) locked = False break except: time.sleep(1.0) if locked: self.log.exception("Could not acquire lock on %s" % fname) ofile.close() return None return ofile def get_metric_values(self): if exists(self.report_file): # Read current metrics csvfile = self._open_file(self.report_file) if not csvfile: return {} unique_values = {} try: csv_reader = csv.reader(csvfile) for (textkey, value, timestamp) in csv_reader: timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") value = float(value) unique_values["%s:%s" % (textkey, timestamp.strftime("%Y%m%d%H%M"))] = [textkey, value, time.mktime(timestamp.timetuple())] except: self.log.error("Unable to parse custom metric file") unique_values = {} unique_values = list(unique_values.values()) unique_values.sort(key=lambda v: v[2]) custom_values = {} for textkey, value, timestamp in unique_values: if textkey not in custom_values: custom_values[textkey] = [[value, timestamp]] else: custom_values[textkey].append([value, timestamp]) # Remove all synced metrics csvfile.seek(0) csvfile.truncate() # Release lock fcntl.flock(csvfile, fcntl.LOCK_UN) csvfile.close() return custom_values else: return {} def get_registered_metrics(self): if exists(self.register_file): # Read current metrics csvfile = self._open_file(self.register_file) if not csvfile: return {} csvreader = csv.reader(csvfile) try: metrics = dict([(row[0], row[1]) for row in csvreader]) except Exception: self.log.exception("Error reading custom metric register file") metrics = {} # Remove all synced metrics csvfile.seek(0) csvfile.truncate() # Release lock fcntl.flock(csvfile, fcntl.LOCK_UN) csvfile.close() return metrics else: return {} def get_existing_metrics(self): existing_tkeys = [] for sr_id, schedule in list(self.db["schedules"].items()): tkey = "%s.%s" % (schedule.plugin_textkey, schedule.resource_textkey) if tkey not in existing_tkeys: existing_tkeys.append(tkey) return existing_tkeys def ignore_metric(self, plugin_textkey, resource_textkey): if plugin_textkey == 'com.pnp-hcl.dominostats': if resource_textkey.startswith('Mem.PID.'): return True return False def process_imports(self, config): req_top_keys = ["plugin_textkey", "plugin_category_name"] req_metric_keys = ["textkey", "value", "unit", "timestamp"] req_incident_keys = ["textkey", "description", "action", "timestamp"] existing_metrics = self.get_existing_metrics() self.log.info("Processing incoming import files") new_metrics = {} new_values = {} custom_incidents = [] import_dirs = [self.custom_import_dir] additional_dirs = config.has_option("agent", "metric_incoming_directory") and\ config.get("agent", "metric_incoming_directory") or None if additional_dirs: import_dirs.extend(additional_dirs.split(",")) max_files = self.MAX_IMPORT_FILES max_override = config.has_option("agent", "max_incoming_files_override") and\ config.get("agent", "max_incoming_files_override") or None if max_override: max_files = int(max_override) files = [] for directory in import_dirs: if not isdir(directory): continue if len(files) >= max_files: break self.log.info("Looking in %s", directory) for f in os.listdir(directory): if len(files) >= max_files: break if isfile(join(directory, f)): files.append(join(directory, f)) for full_path in files: fname = basename(full_path) # Check if we can delete this file when we're done if not os.access(full_path, os.W_OK): self.log.error("Can not delete %s so will not process.", full_path) continue f = open(full_path, "r+") try: self.log.info("Processing %s", full_path) j = json.loads(f.read()) f.close() for req in req_top_keys: if req not in list(j.keys()): logging.error("Can not process file %s! Missing required key: %s", fname, req) # TODO: Log full file here? continue metrics = j.get('metrics', []) for m in metrics: for req in req_metric_keys: if req not in list(m.keys()): logging.error("Can not process metric! Missing required key: %s", req) pprint(m) continue if self.ignore_metric(j["plugin_textkey"], m["textkey"]): continue try: try: unix_timestamp = int(m["timestamp"]) except Exception: timestamp = datetime.strptime(m["timestamp"], "%Y-%m-%d %H:%M:%S") unix_timestamp = calendar.timegm(timestamp.timetuple()) except Exception: self.log.error("Could not process timestamp %s for metric %s", m["timestamp"], m["textkey"]) continue new_value = (m["value"], unix_timestamp) tkey = "%s.%s" % (j["plugin_textkey"], m["textkey"]) if tkey not in existing_metrics: if tkey in new_metrics: new_metrics[tkey].setdefault("first_values", []).append(new_value) else: label = m.get("label", None) if label is None: label = m["textkey"] new_metrics[tkey] = {"plugin_textkey": j["plugin_textkey"], "plugin_name": j["plugin_category_name"], "resource_textkey": m["textkey"], "label": label, "unit": m.get("unit", None), "first_values": [new_value]} else: new_values.setdefault(tkey, []).append(new_value) incidents = j.get('incidents', []) for incident in incidents: for req in req_incident_keys: if req not in list(incident.keys()): logging.error("Can not process incident! Missing required key: %s", req) pprint(incident) continue try: try: unix_timestamp = int(incident["timestamp"]) except Exception: timestamp = datetime.strptime(incident["timestamp"], "%Y-%m-%d %H:%M:%S") unix_timestamp = calendar.timegm(timestamp.timetuple()) except Exception: self.log.error("Could not process timestamp %s for incident %s", incident["timestamp"], incident["textkey"]) continue obj = {"plugin_textkey": j["plugin_textkey"], "resource_textkey": incident["textkey"], "timestamp": unix_timestamp, "description": incident["description"], "action": incident["action"]} if "match_key" in incident: obj["match_key"] = incident["match_key"] if "metadata" in incident: obj["metadata"] = incident["metadata"] custom_incidents.append(obj) # All done with this file, delete it os.remove(full_path) except Exception: if f.closed: f = open(full_path, f.mode) self.log.error("Error processing %s:", fname) # TODO: Can this be debug instead? f.seek(0) self.log.info(f.read()) self.log.error(traceback.format_exc()) self.log.error("Deleting file") f.close() os.remove(full_path) continue return new_metrics, new_values, custom_incidents def get_update_config(self): config = { 'fqdn': get_fqdn() } if os.path.exists(self.update_config_file): manfile = self._open_file(self.update_config_file) if not manfile: return config # Read current properties properties = self.get_old_style_config_properties(self.update_config_file) # Release lock and remove manfile.seek(0) manfile.truncate() fcntl.flock(manfile, fcntl.LOCK_UN) manfile.close() try: os.remove(self.update_config_file) except: pass return properties else: if self.is_fortisase_install: server_name = get_server_name() if server_name: config['server_name'] = server_name return config def __init__(self, brand, agg_url, version, user, bin_dir, lib_dir, pkg_dir, timeout, base_config_dir, base_custom_plugin_dir, base_data_dir, base_log_dir, acceptable_sync_delay): self.brand = brand self.agg_url = agg_url self.version = version self.user = user self.lib_dir = lib_dir self.bin_dir = bin_dir self.pkg_dir = pkg_dir self.tmp_dir = tempfile.gettempdir() self.metadata_rebuild_freq = 3600 # How often do we want to rebuild metadata (seconds) self.is_root = os.getuid() == 0 or os.geteuid() == 0 self.acceptable_sync_delay = acceptable_sync_delay # XXX I think these dir settings might need to be moved back into the # configs. # These dirs and files are managed by the script, not the package. # Need to be created by the script by --install, and removed by --uninstall. self.db_dir = os.path.join(base_data_dir, self.pkg_dir) self.db_file = join(self.db_dir, "%s.db" % self.pkg_dir) self.log_dir = os.path.join(base_log_dir, self.pkg_dir) self.log_file = join(self.log_dir, "%s.log" % self.pkg_dir) self.config_dir = os.path.join(base_config_dir, self.pkg_dir) self.config_file = join(self.config_dir, "%s_agent.cfg" % self.brand) self.custom_plugin_dir = os.path.join(base_custom_plugin_dir, self.pkg_dir) self.countermeasures_custom_plugin_dir = os.path.join(self.custom_plugin_dir, "countermeasures") self.custom_import_dir = os.path.join(self.custom_plugin_dir, 'incoming') self.manifest_file = os.path.join(base_config_dir, "%s-agent-manifest" % self.brand) data_dir = os.path.join(base_data_dir, self.pkg_dir) self.pid_file = os.path.join(data_dir, 'agent.pid') self.update_config_file = os.path.join(base_data_dir, self.pkg_dir, 'update-config') # Plugin textkey for custom metrics specified by the user as well as register and report files if 'freebsd' in sys.platform.lower(): self.register_file = os.path.join(lib_dir, 'register') self.report_file = os.path.join(lib_dir, 'report') elif 'darwin' == sys.platform.lower(): self.register_file = os.path.join(self.custom_plugin_dir, 'register') self.report_file = os.path.join(self.custom_plugin_dir, 'report') else: self.register_file = os.path.join(base_data_dir, self.pkg_dir, 'register') self.report_file = os.path.join(base_data_dir, self.pkg_dir, 'report') # See if we've been installed - the BIN_DIR directory neeeds to exist, and then we need to # make sure there is a server_key in the config file self.is_installed = True self.has_dem = False self.dem_port = 'demservice' self.update_service_port = 'updateservice' self.ipcPath = '/tmp/com.fortinet.fortimonitor' self.auto_update = False self.scheduled_update = None self.is_fortisase_install = False self.proxy_config = None try: if not exists(self.bin_dir): raise Exception('No bin directory') if not os.path.exists(self.config_file): raise Exception('No config file {}'.format(self.config_file)) config_file = configparser.ConfigParser() config_file.read(self.config_file) if config_file.has_section('agent_proxy'): self.proxy_config = config_file['agent_proxy'] if 'darwin' == sys.platform: if config_file.has_option('dem', 'enabled'): self.has_dem = config_file.get('dem', 'enabled').lower() == 'true' if config_file.has_option('dem', 'server_port'): self.dem_port = config_file.get('dem', 'server_port') if config_file.has_option('agent', 'updateservice.port'): self.update_service_port = config_file.get('agent', 'updateservice.port') if config_file.has_option('agent', 'ipc_path'): self.ipcPath = config_file.get('agent', 'ipc_path') if config_file.has_option('agent', 'auto_update'): self.auto_update = config_file.get('agent', 'auto_update').lower() == 'true' if config_file.has_option('agent', 'scheduled_update'): self.scheduled_update = config_file.get('agent', 'scheduled_update') if config_file.has_option('agent', 'handshake_type'): if 'forticlient' == config_file.get('agent', 'handshake_type').lower(): self.is_fortisase_install = True server_key = config_file.get("agent", "server_key") if not server_key: raise Exception('Missing server key') except Exception as e: sys.stderr.write('Initialize exception: {}'.format(e)) self.is_installed = False try: safe_counter = int(config_file.get('agent', 'safe_counter')) except: safe_counter = 3 # Custom OS block # Here we'll update sys.platform for all plugins to be able to use if "VMkernel" in os.uname(): sys.platform = "vmware" # Actual run of the agent delay. self.current_agent_delay = 0 self.set_up_logging() self.log = logging.getLogger(self.__class__.__name__) try: self.timeout = float(config_file.get('agent', 'startup_timeout')) except Exception: self.timeout = timeout if not self.safe_to_start_agent(timeout, counter=safe_counter): # Need to overwrite delete to avoid removing a pid self.__del__ = lambda: self.log.warning('Preventing pid file removal') self.log.warning( 'Exiting without running - other agent process already running' ) sys.exit(1) self.write_pid_file() self.db = self.open_db() # XXX This may be removed at a later date, when all agents' configs have # been migrated. self.migrate_config() self._blacklister = PluginBlacklister() self.log.info('Activity started') def migrate_config(self): ''' Update agent configs to use "[agent]" instead of "[AgentConfig]" as the main heading and "aggregator_url" instead of "agg_url" (in order to match the option in the manifest file). ''' if self.db['config_migrated']: self.log.info('Config is in the correct format') return config = configparser.ConfigParser() if config.read(self.config_file): config_has_changed = False if not config.has_section('agent'): config.add_section('agent') config_has_changed = True self.log.info('Added [agent] section to config') if config.has_section('AgentConfig'): for option, value in config.items('AgentConfig'): if option == 'agg_url': option = 'aggregator_url' config.set('agent', option, value) config.remove_section('AgentConfig') config_has_changed = True self.log.info('Copied deprecated [AgentConfig] section to [agent] and removed it from config') if config_has_changed: config_file = open(self.config_file, 'w') config.write(config_file) config_file.close() self.db['config_migrated'] = True def __del__(self): self.remove_pid_file() def set_up_logging(self): root_logger = logging.getLogger() if not os.path.isdir(self.log_dir): os.system('mkdir -p {}'.format(self.log_dir)) try: log_file = open(self.log_file, 'a') except IOError: print(('Cannot open log file %s: "%s"' % (self.log_file, str(traceback.format_exc())))) print('Logging to stderr instead') handler = logging.StreamHandler() else: log_file.close() handler = logging.handlers.RotatingFileHandler( self.log_file, 'a', maxBytes=5 * 1024**2, backupCount=5 ) handler.setFormatter(logging.Formatter('%(process)d) ' '%(asctime)s - ' '%(name)s - ' '%(levelname)s - ' '%(message)s')) root_logger.addHandler(handler) # We initialize the level to NOTSET here to make sure that all # logging inside PickleDatabase is captured because the root # logger's default log level is WARNING. See # https://docs.python.org/2/library/logging.html#logging.Logger.setLevel # for details). root_logger.setLevel(logging.NOTSET) db = self.open_db() try: log_level = getattr(logging, db['log_level'].upper()) except: log_level = getattr(logging, self.DEFAULT_LOG_LEVEL) root_logger.setLevel(log_level) def parse_arguments(self, parser): """ Return the options and arguments parsed from the parser. """ if self.is_installed: parser.add_option("--server-key", dest="server_key", action="store") parser.add_option("--rebuild-metadata", action="store_true", dest="rebuild_metadata", default=False) parser.add_option("--status", action="store_true", dest="status", default=False) parser.add_option("--stats", action="store_true", dest="stats", default=False) parser.add_option("--from-cron", action="store_true", dest="from_cron", default=False) parser.add_option("--aggregator", action="store", dest="aggregator") parser.add_option("--install", action="store_true", default=False, dest="install") parser.add_option("--uninstall", action="store_true", default=False, dest="uninstall") parser.add_option("--remove-instance", action="store_true", default=False, dest="remove_instance") parser.add_option("--customer-key", default=None, action="store", dest="customer_key") parser.add_option("--unpause", default=None, action="store_true", dest="unpause") # Docker parser.add_option("--list-containers", default=None, action="store_true", dest="list_containers") parser.add_option("--rebuild-container-metadata", default=None, action="store_true", dest="rebuild_container_metadata") options, args = parser.parse_args() return options, args def main(self): activityStart = datetime.now() if self.is_installed: db = self.db else: db = False server_key = None config = configparser.RawConfigParser() config.read(self.config_file) if self.is_installed and db and config != []: try: server_key = config.get("agent", "server_key") except: server_key = None try: self.agg_url = config.get('agent', 'aggregator_url') or self.agg_url except: self.agg_url = None # installed? just print out the server key usage = """%%prog [options] %s, server key: %s, aggregator endpoint: %s """ % (self.pkg_dir, server_key, self.agg_url) # not installed? print out the install usage else: usage = """sudo python %%prog --install [--customer-key=YOUR_CUSTOMER_KEY] %s""" % (self.pkg_dir,) parser = optparse.OptionParser(usage=usage) options, args = self.parse_arguments(parser) if options.status: plugins = PluginManager(db, self.config_file, join(self.lib_dir, "plugins"), self.custom_plugin_dir) display.status(self, server_key, db['schedules'], plugins) if options.stats: display.stats(db['schedules'], db['num_syncs'], db['last_sync']) if options.uninstall: aggregator_client = aggregator.Client(self.agg_url, self.version, server_key, proxy_config=self.proxy_config) self.uninstall(aggregator_client, options.remove_instance) exit() if not self.is_installed or options.install: if options.aggregator: self.agg_url = options.aggregator customer_key = options.customer_key or None if 'darwin' == sys.platform.lower(): dirs_to_create = [ (self.db_dir, None), (self.config_dir, None), (self.custom_plugin_dir, 0o777), (self.countermeasures_custom_plugin_dir, 0o777), (self.custom_import_dir, 0o777) ] for dir, perms in dirs_to_create: os.system('mkdir -p {}'.format(dir)) if perms: os.chmod(dir, perms) for rfile in [self.register_file, self.report_file]: with open(rfile, 'a+') as rf: pass if not os.path.isfile(rfile): self.log.warning('Installer did not create {}'.format(rfile)) self.install(self.agg_url, self.version, server_key, customer_key) return # Require at least one of these options valid_options = ["from_cron", "aggregator", "rebuild_metadata", "server_key", "unpause", "list_containers", "rebuild_container_metadata" ] option_given = False for valid_option in valid_options: if getattr(options, valid_option, None): option_given = True break if not option_given: msg = "%s Agent v%s, server key: %s, aggregator endpoint: %s" % (self.brand, self.version, server_key, self.agg_url) print(msg) self.log.info(msg) return # Support unpausing from the command line if options.unpause: print("Unpausing agent, will run as usual on next run") db["pause"] = None db.save() return # Docker cli commands if options.list_containers: if 'docker_containers' not in db or db['docker_containers'] == {}: print("No monitored containers") return containers = db['docker_containers'] print("Monitored Containers:\n") print("CONTAINER ID\tIMAGE\t\tCOMMAND\t\t\tSTATUS") for short_id, metadata in containers.items(): cont_image = metadata.get("Image", "?") cont_command = metadata.get("Command", "?") cont_status = metadata.get("Status", "?") print('%s\t%s\t"%s"\t%s' % (short_id, cont_image, cont_command[:20], cont_status)) return if options.rebuild_container_metadata: db['rebuild_container_metadata'] = True print("Metadata queued for rebuild") self.log.info("Container metadata rebuild queued") return requested_auto_update = False try: just_set_option_and_quit = False if options.server_key: just_set_option_and_quit = True key = options.server_key print(("Setting server key to %s" % key)) config.set("agent", "server_key", key) if options.aggregator: just_set_option_and_quit = True agg = options.aggregator print(("Setting aggregator endpoint to %s" % agg)) config.set("agent", "aggregator_url", agg) if just_set_option_and_quit: config.write(open(self.config_file, 'wb')) exit(0) # Linux agent should not run if executed as root if self.is_root and not options.rebuild_metadata: self.log.error("Linux agent should not run if executed as root") print("Linux agent should not run if executed as root") return server_key = config.get("agent", "server_key") aggregator_client = aggregator.Client( self.agg_url, self.version, server_key, proxy_config=self.proxy_config ) # should never be here if not server_key: print("No server key found, please re-install the agent.") exit(1) if self.has_dem: try: needs_schedules = False schedules_received = self._sendReceive('schedules-init', port=self.dem_port) if schedules_received is None or 'no' == schedules_received: needs_schedules = True if needs_schedules or self._agent_version_updated(db): self._init_dem_schedules(aggregator_client) except: pass plugins = PluginManager(db, self.config_file, join(self.lib_dir, "plugins"), self.custom_plugin_dir) wifi_info = self.get_dem_wifi_info() if wifi_info: plugins.add_dem_wifi_results(wifi_info) # Check on Countermeasures remote plugins update if "enable_countermeasures" in config.options("agent") and \ config.get("agent", "enable_countermeasures").lower() == "true" and \ "countermeasures_remote_plugins" in config.options("agent") and \ "countermeasures_refresh_plugins" in config.options("agent"): # See if we need to refresh refresh_cycle = int(config.get("agent", "countermeasures_refresh_plugins")) * 3600 if "countermeasures_last_refresh" not in db or \ time.time() - db["countermeasures_last_refresh"] > refresh_cycle: for url in config.get("agent", "countermeasures_remote_plugins").split(","): self.log.info("Refreshing CounterMeasures plugins from %s" % url) cmd = "%s %s/countermeasure.py install_plugins --url %s &" % (sys.executable, self.bin_dir, url.strip()) os.system(cmd) db["countermeasures_last_refresh"] = time.time() elif "countermeasures_last_refresh" in db: self.log.info("Waiting to refresh CM plugins in %d minutes" % ((db["countermeasures_last_refresh"] + refresh_cycle - time.time())/60)) # run all_plugins_start_time = datetime.now() results_to_send = [] custom_metrics = self.get_metric_values() new_import_metrics, new_import_values, custom_incidents = self.process_imports(config) # Create an anomalies container if it isn't already there if 'anomalies' not in db or db['anomalies'] == None: db['anomalies'] = {} for sr_id, schedule in list(db["schedules"].items()): schedule_tkey = "%s.%s" % (schedule.plugin_textkey, schedule.resource_textkey) # FIXME I gave the next check time a five-second leeway. There must be a better way! # Ignore schedule freuqency for custom metrics from JSON files and report.py calls leeway_time = 5 if schedule.plugin_textkey != self.CUSTOM and \ schedule.resource_textkey not in custom_metrics and \ schedule_tkey not in new_import_values and \ schedule.next_check_time > (all_plugins_start_time + timedelta(seconds=leeway_time)): self.log.info("%r too early to check", schedule) continue frequency = timedelta(seconds=schedule.frequency) current_agent_delay = timedelta(seconds=self.current_agent_delay) # Gave more leeway time to compensate the time sleeping if any. There must be a better way! schedule.next_check_time = (all_plugins_start_time + frequency - current_agent_delay) if schedule_tkey in new_import_values: scale = plugins.config.get(schedule_tkey, {}).get("scale", 1.0) for value, timestamp in new_import_values[schedule_tkey]: if value is not None: value *= scale if value is None: continue results_to_send.append((sr_id, timestamp, value)) anomalies = {} elif schedule.plugin_textkey == self.CUSTOM: if schedule.resource_textkey not in custom_metrics: continue scale = plugins.config.get(schedule.plugin_textkey, {}).get("scale", 1.0) for value, timestamp in custom_metrics[schedule.resource_textkey]: if value is not None: value *= scale if value is None: continue results_to_send.append((sr_id, timestamp, value)) anomalies = {} elif schedule.plugin_textkey not in plugins.plugins: # Likely a custom metric that didn't report in this period # TODO: Better way to do this? self.log.info("No custom value or plugin for %s", schedule_tkey) continue else: plugin_start_time = datetime.now() t0 = time.time() value, anomalies = schedule.check(plugins, db['anomalies'].get(schedule.id, {})) t1 = time.time() self.log.debug("%r returned %s in %.2f seconds" % (schedule, value, t1-t0)) if value is None: continue results_to_send.append((sr_id, time.mktime(plugin_start_time.timetuple()), value)) self.log.info('Running all plugins took %s', datetime.now() - all_plugins_start_time) # Add data to our queue db["result_queue"].update(results_to_send) # sync # If we're paused, we don't want to sync and will just exit here. if db["pause"]: if time.time() < db["pause"]: db.save() time_left = (db["pause"] - time.time()) / 60.0 self.log.info("Pause command recieved. Processing stopped. Process will resume in %.2f minutes." % time_left) return else: # We've paused as long as instructed, now set pause to None and resume with the sync db["pause"] = None db.save() self.log.info("Pause duration exceeded, unpausing the agent for the next run") return start_time = time.time() # do we need to resend and re-cache metadata? metadata = None fortisase_attributes = {} countermeasures_metadata = [] facts = None # let's just ensure that once a day they push, just in case something # gets out of sync lucky_day = random.randint(0, 1440) == 0 # See if we need to rebuild the metadata - performed every hour, or if specified by the --rebuild-metadata # command line option, or if the agent config file has changed since the last time we saw it rebuild_metadata = False if 'last_metadata_time' not in db: db['last_metadata_time'] = 0 if time.time() - db['last_metadata_time'] > self.metadata_rebuild_freq: rebuild_metadata = True if options.rebuild_metadata: rebuild_metadata = True if 'last_config_file_time' not in db: db['last_config_file_time'] = time.time() last_config_file_time = os.path.getmtime(self.config_file) if last_config_file_time > db['last_config_file_time']: rebuild_metadata = True db['last_config_file_time'] = last_config_file_time if rebuild_metadata: self.log.info("Rebuilding plugin metadata") db['last_metadata_time'] = time.time() if 'custom_plugin_url' in config.options('agent'): plugins.install_remote_plugins(config.get('agent', 'custom_plugin_url')) stale = plugins.is_metadata_stale() if stale or options.rebuild_metadata or lucky_day or not db["has_connected_with_aggregator"]: metadata = plugins.metadata if stale: self.log.info("metadata changed") elif options.rebuild_metadata: self.log.info("rebuilding metadata") elif lucky_day: self.log.info("randomly forcing metadata rebuild") elif not db["has_connected_with_aggregator"]: self.log.info("we've never pushed up metadata before") # If we're rebuilding metadata, also get the server facts facts = Inspector(self).get_all_facts(wifi_info) fortisase_attributes = self.get_fortisase_attributes() # If Countermeasures is enabled, rebuild Countermeasure plugin metadata # Dynamically load all available plugins, both in the default install directory # and the customer's custom directory countermeasures_metadata = [] if "enable_countermeasures" in config.options("agent") and \ config.get("agent", "enable_countermeasures").lower() == "true": for directory in (os.path.join(self.lib_dir, "countermeasures", "plugins"), self.countermeasures_custom_plugin_dir): if not os.path.exists(directory): continue sys.path.append(directory) for mod_name in os.listdir(directory): if mod_name.endswith(".py") and not mod_name.startswith("__"): try: mod = p_importlib.import_module(mod_name[:-3]) except: self.log.exception("Unable to import module %s" % mod_name) continue # Compute the hash of the plugin, being if sha_func: if six.PY2: hash = sha_func(open(os.path.join(directory, mod_name)).read()).hexdigest() else: hash = sha_func(open(os.path.join(directory, mod_name)).read().encode('utf-8')).hexdigest() else: hash = "" for name, obj in list(mod.__dict__.items()): if ((sys.version_info[0] == 3 and type(obj) == type) or (sys.version_info[0] == 2 and type(obj) == types.ClassType)) \ and name.endswith("Countermeasure"): try: plugin = obj() countermeasures_metadata.append({"textkey": plugin.textkey, "name": plugin.name, "author": plugin.author, "hash": hash, "description": plugin.description}) except: pass if mod_name.endswith('.json'): try: json_counter = open(os.path.join(directory, mod_name)) except: self.log.error('Unable to open %s' % os.path.join(directory, mod_name)) self.log.error(traceback.format_exc()) continue file_content = json_counter.read() if sha_func: hash = sha_func(file_content.encode('utf-8')).hexdigest() else: hash = "" json_counter.close() try: counter_data = json.loads(file_content) except Exception: self.log.error('%s file is not a valid json file to be read' % mod_name) self.log.error(traceback.format_exc()) continue required_fields = ['name', 'textkey', 'command', 'author'] existing_keys = counter_data.keys() success = True for key in required_fields: if key not in existing_keys or not counter_data.get(key): self.log.error('%s is missing from the countermeasure declaration in %s' % (key, mod_name)) success = False break if not success: continue textkey = counter_data.get('textkey') countermeasures_metadata.append({ 'textkey': textkey, 'name': counter_data.get('name'), 'author': counter_data.get('author'), 'hash': hash, 'description': counter_data.get('description') }) # Check if we can access Docker if "docker_supported" not in db or not db["docker_supported"]: can_access_docker = container_discovery.check_access() if can_access_docker == 'success': db["docker_supported"] = True self.log.info('Docker supported') elif can_access_docker == 'no-permission': self.log.info('Missing permission to access Docker socket') if "docker_supported" in db and db["docker_supported"]: db["rebuild_container_metadata"] = True # actually sync response = {} command_results = {} # Send results of a log request back to the agent if "log_request" in db and db["log_request"]: command_results["log_request"] = str(db["log_request"]) if "diagnostics" in db and db["diagnostics"]: command_results["diagnostics"] = str(db["diagnostics"]) if "socket_stats" in db and db["socket_stats"]: command_results["socket_stats"] = str(db["socket_stats"]) if "mtr" in db and db["mtr"]: command_results["mtr"] = str(db["mtr"]) auto_topo_scans = [] if "auto_topo_scans" in db: auto_topo_scans = db["auto_topo_scans"] try: anomalies_to_report = [] self.log.info('Syncing with aggregator: %d results, %d anomalies', len(results_to_send), len(anomalies_to_report)) if metadata: metadata_summary = dict((plugin_key, len(list(plugin_metadata.keys()))) for plugin_key, (_, plugin_metadata) in list(metadata.items())) self.log.debug('Metadata summary: %r', metadata_summary) force_send_schedules = False if db["num_syncs"] == 0 or db["schedules"] == {}: force_send_schedules = True if rebuild_metadata or db['sync_schedules']: force_send_schedules = True # We have a lot of results coming into the aggregator all at once from # various agents (every minute usually). We put an artificial random delay here before syncing # to stagger the results that come in. delay = random.randint(1, self.acceptable_sync_delay or 1) time.sleep(delay) # Pull results out of our queue to send # If single result time is set, we only want to send the latest result, and not anything else # in the queue. if db["single_result"]: if time.time() < db["single_result"]: result_data = db["result_queue"].pop_results(len(db["schedules"])) else: db["single_result"] = None result_data = db["result_queue"].pop_results() else: result_data = db["result_queue"].pop_results() # See if we have any queued discovered containers to send up discovered_containers = [] deleted_containers = [] MAX_CONTAINERS_SYNC = 20 if "discovered_containers" in db: container_queue = db["discovered_containers"] for i in range(min(len(container_queue), MAX_CONTAINERS_SYNC)): discovered_containers.append(container_queue.pop(0)) if "deleted_containers" in db: deleted_container_queue = db["deleted_containers"] for i in range(len(deleted_container_queue)): deleted_containers.append(deleted_container_queue.pop(0)) dem_results = self._getDemResults(db) try: # Set traceback limit 0 to include only the error message w/o the traceback sys.tracebacklimit=0 new_import_metrics = list(new_import_metrics.values()) if server_key: response = aggregator_client.sync(result_data, anomalies_to_report, metadata, countermeasures_metadata, facts, discovered_containers, deleted_containers, self.get_registered_metrics(), new_import_metrics, custom_incidents, self.get_update_config(), self.get_all_ips(), auto_topo_scans, force_send_schedules, command_results, dem_enabled=self.has_dem, dem_service_results=dem_results, fortisase_attributes=fortisase_attributes) db["log_request"] = None db["diagnostics"] = None db["socket_stats"] = None db["mtr"] = None db["auto_topo_scans"] = [] db['sync_schedules'] = None dem_updates = { 'icmp_server_resources' : response.get('icmp_server_resources', {}), 'monitor_schedules' : response.get('monitor_schedules', {}), 'traceroutes' : response.get('traceroutes', []), 'traceroute_checks' : response.get('traceroute_checks', {}) } self._updateDEMServiceSchedules(dem_updates) else: self.log.info("No server_key found, skipping sync") except: # Failed to hit aggregator, so we'll put those results back into the queue db["result_queue"].update(result_data) for demKey in dem_results.keys(): q = db[demKey] q.update(dem_results[demKey]) self.log.exception("Could not sync with aggregator") self.log.debug('Saving results locally: %r', result_data) db.save() # Note: sys.exit() only raises a SystemExit exception. return if response.get("found_server", False): db["has_connected_with_aggregator"] = True db["num_syncs"] += 1 db["last_sync"] = datetime.now().strftime("%m/%d/%Y %H:%M") except: self.log.exception("Error syncing with aggregator") else: if rebuild_metadata: db["last_metadata"] = plugins.hashed_metadata() self.log.info("syncing took %.2f seconds", time.time() - start_time - delay) # Execute any Countermeasures in the response, spawned as separate background processes which can # continue to execute after the agent exits if "enable_countermeasures" in config.options("agent") and \ config.get("agent", "enable_countermeasures").lower() == "true": for countermeasure in response.get("countermeasures", []): hash = countermeasure.get("hash") textkeys = countermeasure.get("textkeys", []) cm_metadata = countermeasure.get("metadata", {}) metadata_file = "" # Write the JSON metadataout to a temp file try: fname = "countermeasure-metadata-%s.json" % hash metadata_file = os.path.join(self.tmp_dir, fname) f = open(metadata_file, 'w') f.write(json.dumps(cm_metadata)) f.close() except Exception: self.log.error("Failed parsing countermeasure metadata for %s: %s" % (hash, textkeys)) self.log.error(traceback.format_exc()) self.log.info("Queueing countermeasures for %s: %s" % (hash, textkeys)) if textkeys: cmd = "%s %s/countermeasure.py execute --hash %s --textkeys %s" % (sys.executable, self.bin_dir, hash, " ".join(textkeys)) if metadata_file: cmd += " --metadata-file %s" % metadata_file os.spawnvp(os.P_NOWAIT, sys.executable, cmd.split()) # now process what we got back from the sync self.update_schedules(response.get('schedules', [])) # process our agent commands if response.get("commands", []): self.log.info("got %d agent commands", len(list(response["commands"].keys()))) if "pause" in response["commands"]: seconds = response["commands"]["pause"] # Seconds db["pause"] = time.time() + seconds if "single_result" in response["commands"]: seconds = response["commands"]["single_result"] db["single_result"] = time.time() + seconds if "log_request" in response["commands"]: lines = response["commands"]["log_request"] # Number of lines to tail from log log_output = subprocess.check_output("tail -%d %s" % (lines, self.log_file), shell=True) db["log_request"] = log_output # We'll send back log output if "queue_batch_size" in response["commands"]: queue_batch_size = response["commands"]["queue_batch_size"] db['result_queue'].queue_batch_size = queue_batch_size if "queue_max_results" in response["commands"]: queue_max_results = response["commands"]["queue_max_results"] db['result_queue'].queue_max_results = queue_max_results if "socket_stats" in response["commands"]: try: args = response["commands"].get("socket_stats") timeout = args.get("timeout") if timeout is None: timeout = 10 timeout = int(timeout) ss_cmd = "ss -t -u -r 2>&1" if which("timeout"): ss_cmd = "timeout %d %s" % (timeout, ss_cmd) socket_stats = subprocess.check_output(ss_cmd, shell=True) db["socket_stats"] = socket_stats except: db["socket_stats"] = traceback.format_exc() if "mtr" in response["commands"]: try: args = response["commands"].get("mtr") host = args.get("host") timeout = args.get("timeout") if timeout is None: timeout = 10 timeout = int(timeout) if host is None: parsed_url = urlparse.urlparse(self.agg_url) if parsed_url.hostname is None: parsed_url = urlparse.urlparse('http://' + self.agg_url) host = parsed_url.hostname mtr_cmd = "mtr --csv -c 1 %s 2>&1" mtr_cmd %= host if which("timeout"): mtr_cmd = "timeout %d %s" % (timeout, mtr_cmd) mtr_output = subprocess.check_output(mtr_cmd, shell=True) db["mtr"] = mtr_output except: db["mtr"] = traceback.format_exc() # Change severity of log level log_level_key = response['commands'].get('log_level') if log_level_key is not None: log_level_key = log_level_key.upper() try: log_level = getattr(logging, log_level_key) db['log_level'] = log_level_key self.log.setLevel(log_level) level = logging.INFO message = 'Set log level to "%s"' except AttributeError: level = logging.WARNING message = 'Invalid log level command: "%s"' self.log.log(level, message % log_level_key) if "diagnostics" in response["commands"]: db["diagnostics"] = self.build_diagnostics(db, self.version, self.brand) if "metadata_resync" in response["commands"]: db['last_metadata_time'] = 0 db["last_metadata"] = None if "refresh_countermeasures" in response["commands"] and \ "enable_countermeasures" in config.options("agent") and \ config.get("agent", "enable_countermeasures").lower() == "true" and \ "countermeasures_remote_plugins" in config.options("agent") and \ "countermeasures_refresh_plugins" in config.options("agent"): for url in config.get("agent", "countermeasures_remote_plugins").split(","): self.log.info("Refreshing CounterMeasures plugins from %s" % url) cmd = "%s %s/countermeasure.py install_plugins --url %s &" % (sys.executable, self.bin_dir, url.strip()) os.system(cmd) db["countermeasures_last_refresh"] = time.time() if "rebuild_container_metadata" in response["commands"]: db["rebuild_container_metadata"] = True if 'update_agent' in response['commands']: requested_auto_update = True if 'sync_schedules' in response['commands']: db['sync_schedules'] = True if 'get_logs' in response['commands']: try: self.upload_logs(server_key) except: pass if self.is_root: self.log.info("Linux agent running as root, skipping container discovery") print("Linux agent running as root, skipping container discovery") else: if "docker_supported" in db and db["docker_supported"]: if "docker_containers" not in db: db["docker_containers"] = {} rebuild_container_metadata = False if "rebuild_container_metadata" in db and db["rebuild_container_metadata"]: rebuild_container_metadata = True db["rebuild_container_metadata"] = False existing_containers = db["docker_containers"] existing_container_ids = list(existing_containers.keys()) try: found_containers = container_discovery.discover_docker_containers(config, plugins, existing_containers, rebuild=rebuild_container_metadata) except Exception: t, e = sys.exc_info()[:2] self.log.error(e) self.log.error("Docker has been enabled but the fm-agent user needs to be added to the docker group.\n" "You can do so with `sudo usermod -a -G docker fm-agent`") found_containers = None if found_containers: found_container_ids = [c["Id"][:12] for c in found_containers] new_containers = [] for container in found_containers: container_id = container["Id"][:12] # Always update the db copy, in case something changed existing_containers[container_id] = container if rebuild_container_metadata\ or container_id not in existing_container_ids: new_containers.append(container) if "updated" in container and container["updated"]: del container["updated"] new_containers.append(container) deleted_containers = [] for container_id, container in existing_containers.items(): if container_id not in found_container_ids: deleted_containers.append(container_id) # Actually delete for container_id in deleted_containers: del existing_containers[container_id] if "discovered_containers" not in db: db["discovered_containers"] = [] if "deleted_containers" not in db: db["deleted_containers"] = [] db["discovered_containers"].extend(new_containers) db["deleted_containers"].extend(deleted_containers) self.log.info("Discovered %d new/updated containers", len(new_containers)) self.log.info("Found %d newly deleted containers", len(deleted_containers)) self.run_auto_topo_scans(config) except: self.log.exception("Error in main loop") self.checkForUpdate( db=db, server_key=server_key, agg_client=aggregator_client, force=requested_auto_update ) # ideally this should be in a finally block, but older python verisons # don't support try/except/finally, and we need all three db.save() self.log.info('Activity finished in {}s'.format((datetime.now() - activityStart).total_seconds())) def get_fortisase_attributes(self): if not self.is_fortisase_install: return {} try: from forticlient_helper import ForticlientHelper helper = ForticlientHelper() return helper.get_handshake_data() except: return {} def upload_logs(self, server_key): import shutil import tempfile with tempfile.TemporaryDirectory() as tmpdirname: shutil.copytree(self.log_dir, tmpdirname, dirs_exist_ok=True) now = datetime.now() zip_file_prefix = 'agent-logs-{}'.format(now.strftime("%Y%m%d%H%M%S")) zip_output = os.path.join(tmpdirname, 'zip') os.mkdir(zip_output) zip_name = shutil.make_archive( os.path.join('/tmp', zip_file_prefix), 'zip', tmpdirname ) try: endpoint = '{}/v2/agent_logs'.format(self.agg_url) cc = 'curl -F file=@{} -H "Accept: application/json" -H "Authorization: {}" {}'.format( zip_name, server_key, endpoint ) os.system(cc) self.log.info('Uploaded log file {}'.format(zip_name)) except Exception as e: self.log.exception(e) finally: if os.path.isfile(zip_name): os.remove(zip_name) def checkForUpdate(self, db, server_key=None, agg_client=None, force=False): if force: self.log.info('Admin update request') self._onCheckUpdates(agg_client=agg_client) return db_key = 'next_update_check' if not self.auto_update: if db_key in db: db.pop(db_key) return next_update_check = None if not server_key: self.log.warn('checkForUpdate: no server key') return if not agg_client: agg_client = aggregator.Client(self.agg_url, self.version, server_key, proxy_config=self.proxy_config) db_key = 'next_update_check' try: update_period = timedelta(days=1) if db_key not in db: if self.scheduled_update is None: from random import randrange randomSec = randrange(int(update_period.total_seconds())) db[db_key] = datetime.now() + timedelta(seconds=randomSec) else: try: h, m = self.scheduled_update.split(':') rn = datetime.now() ct = datetime(year=rn.year, month=rn.month, day=rn.day, hour=int(h), minute=int(m)) if ct < rn: ct = ct + update_period db[db_key] = ct except Exception as e: self.log.error('Could not calculate next check {}: {}'.format(self.scheduled_update, str(e))) return self.log.info('Next update check at {}'.format(db[db_key])) return next_update_check = db[db_key] if not next_update_check or datetime.now() > next_update_check: self._onCheckUpdates(agg_client) if next_update_check is None: next_update_check = datetime.now() db[db_key] = next_update_check + update_period self.log.info('Next update check at {}'.format(db[db_key])) except Exception as e: self.log.error('checkForUpdates problem: {}'.format(e)) def _onCheckUpdates(self, agg_client): self.log.info('Performing updates check...') # # Note the agent_update_info endpoint expects a framework version, an artifact # of the Windows agent. The aggregator won't use it for darwin, so just send the # our version as the framework. # try: endpoint = 'agent_update_info/darwin/{}'.format(self.version) updates = agg_client.call(endpoint, method='GET') if len(updates) > 0: self._sendReceive( 'updates', port=self.update_service_port, payload=json.dumps(updates)) except Exception as e: self.log.exception('Update check failure: {}'.format(e)) def get_reportable_anomalies(self): # Get the anomalies that are cleared and have previously been reported. self.log.info('Gathering reportable anomalies') # Get the anomalies that exceed duration and have not previously been # reported. Also, mark them as reported. cleared_anomalies = [] lengthy_anomalies = [] for schedule_id, anomalies in list(self.db['anomalies'].items()): schedule = self.db['schedules'].get(schedule_id) if not schedule: # Resource schedule has been deleted from the central aggregator, but # we still have an anomaly - clear that out and proceed del self.db["anomalies"][schedule_id] continue for threshold_id, anomaly in list(anomalies.items()): self.log.debug('Threshold %s', threshold_id) if not anomaly.reported_as_cleared and anomaly.has_cleared(schedule.number_of_checks): cleared_anomalies.append(( schedule_id, threshold_id, time.mktime(anomaly.time_last_detected.timetuple()), False, # False indicates that this anomaly has cleared )) anomaly.reported_as_cleared = True self.log.debug('Cleared anomaly: %s', anomaly) if not anomaly.reported_as_exceeded_duration and anomaly.exceeds_duration(): lengthy_anomalies.append(( schedule_id, threshold_id, time.mktime(anomaly.time_last_detected.timetuple()), True, # True indicates that this anomaly has exceeded duration )) anomaly.reported_as_exceeded_duration = True self.log.debug('Lengthy anomaly: %s', anomaly) self.log.info('Found %d anomalies that have cleared', len(cleared_anomalies)) self.log.debug('Cleared anomalies: %r', cleared_anomalies) self.log.info('Found %d anomalies that exceed the threshold duration', len(lengthy_anomalies)) self.log.debug('Lengthy anomalies: %r', lengthy_anomalies) self.db.save() return cleared_anomalies + lengthy_anomalies def remove_reported_cleared_anomalies(self): self.log.info('Checking for reported cleared anomalies') for schedule_id, anomalies in list(self.db['anomalies'].items()): for threshold_id, anomaly in list(anomalies.items()): if anomaly.reported_as_cleared: anomaly = anomalies.pop(threshold_id) self.log.info('Removed reported cleared anomaly') self.log.debug('Anomaly: %s', anomaly) if not anomalies: self.db['anomalies'].pop(schedule_id) self.log.debug('Remaining anomalies: %s', self.db['anomalies']) self.db.save() def update_schedules(self, new_schedules): if new_schedules == [] or new_schedules == None: self.log.info('No schedule changes received from aggregator') return existing_schedules = self.db['schedules'] self.db['schedules'] = {} for new_schedule_data in new_schedules: new_schedule_id = new_schedule_data['id'] self.log.info('Received schedule %s from aggregator', new_schedule_id) schedule = existing_schedules.get(new_schedule_id, None) try: if schedule: schedule.update(new_schedule_data) del existing_schedules[schedule.id] action = 'Edited' elif not schedule: schedule = Schedule(new_schedule_data) action = 'Created' self.db['schedules'][schedule.id] = schedule self.log.info('%s schedule %s locally', action, schedule.id) self.log.debug('Schedule data: %r', new_schedule_data) except Exception: err = sys.exc_info()[1] error = str(err) self.log.error('Invalid schedule {} data: {}'.format(new_schedule_id, error)) # Our schedule setting doesn't call the correct setitem method, # so we'll save here explicitly self.db.save() self.log.info('Created/updated %d schedules', len(new_schedules)) # Everything that's left is deleted. self.log.info('Deleted %d schedules', len(existing_schedules)) def build_diagnostics(self, db, version, brand): """Function to build a string of diagnostics data to send back to the aggregator.""" string = "AGENT DIAGNOSTICS\n" string += "Agent version: %s\n" % self.version string += "Agent server hostname: %s" % subprocess.check_output("hostname", shell=True) if 'darwin' == sys.platform: string += "Agent OS: %s" % subprocess.check_output("sw_vers | grep ProductVersion", shell=True) else: string += "Agent OS: %s" % subprocess.check_output("cat /etc/*-release | grep PRETTY_NAME", shell=True) string += "uname output: %s" % subprocess.check_output("uname -a", shell=True) if 'darwin' != sys.platform: string += "Package information: %s\n" % subprocess.check_output("apt-cache show %s-agent || true" % self.brand, shell=True) string += "ip output:\n%s" % subprocess.check_output("ip addr show", shell=True) # Build pickle data string += "Local agent pickle file data:\n%s\n" % json.dumps(db.data, indent=2, default=self.defaultprint) return string def defaultprint(self, obj): if isinstance(obj, Schedule): return obj.__repr__() else: return None def open_db(self): if not os.path.isdir(self.db_dir): os.system('mkdir -p {}'.format(self.db_dir)) try: db = PickleDatabase(self.db_file) except: return None # If something went wrong reading the pickle file, our data dict will # be empty and we'll need to rebuild it. To be safe, always go through # and add the keys that need to be there, in case something happened # to them. defaults = { 'anomalies': {}, 'config_migrated': False, 'diagnostics': None, # 'has_connected_with_aggregator' is to get around the problem of # the aggregator issuing a "pause" command to an agent when the # server key sent by the agent isn't found on the controlpanel. When # an agent is first installed, this is the case, but we don't want # to pause the agent. So we add this extra flag so that an agent # will only pause if it has communicated with the aggregator before. 'has_connected_with_aggregator': False, 'last_metadata': None, 'last_sync': None, 'log_level': self.DEFAULT_LOG_LEVEL, 'num_syncs': 0, 'pause': None, 'result_queue': ResultQueue(), 'schedules': {}, 'single_result': None, 'sync_schedules' : None, 'check_results' : ResultQueue(queue_max_results=1000, queue_batch_size=50), 'server_resource_levels' : ResultQueue(queue_max_results=1000, queue_batch_size=50), 'traceroutes' : ResultQueue(queue_max_results=100, queue_batch_size=5), 'traceroute_checks' : ResultQueue(queue_max_results=50, queue_batch_size=5) } for key, default in list(defaults.items()): if key not in db: db[key] = default return db def should_run_auto_topo_scans(self, config): try: return config.get('topo', 'auto_scan') == '1' except: return False def get_num_topo_scans(self, config): try: return int(config.get('topo', 'scans_per_sync')) except: return 0 def get_topo_scan_sleep(self, config): try: return int(config.get('topo', 'scan_sleep')) except: return 1 def run_topo_scan(self): ss_cmd = "ss -t -u -r 2>&1" result = "" t = time.time() self.log.info("Starting topo scan") try: result = str(subprocess.check_output(ss_cmd, shell=True)) except: result = traceback.format_exc() elapsed = time.time() - t self.log.info("Topo scan complete. Elapsed time: %.2f seconds" % elapsed) return result def run_auto_topo_scans(self, config): if not self.should_run_auto_topo_scans(config): return n = self.get_num_topo_scans(config) scan_sleep = self.get_topo_scan_sleep(config) if "auto_topo_scans" not in self.db: self.db["auto_topo_scans"] = [] for i in range(n): t = time.time() scan = self.run_topo_scan() self.db["auto_topo_scans"].append((t, scan)) time.sleep(scan_sleep) def get_dem_wifi_info(self): if not self.has_dem: return None response = self._sendReceive( 'wifi-info', port=self.dem_port ) try: return json.loads(response) except: return None def _getDemResults(self, db): rv = {} if not self.has_dem: return rv response = self._sendReceive('collect', port=self.dem_port) if response is None: return rv latestResults = json.loads(response) for key in latestResults.keys(): try: q = db[key] if q.isEmpty(): rv[key] = latestResults[key] else: q.update(latestResults[key]) rv[key] = q.pop_results() except Exception as e: self.log.error('_getDemResults: {}'.format(e)) continue return rv def _agent_version_updated(self, db): has_update = False if "last_ran_version" in db and db["last_ran_version"]: if db["last_ran_version"] != self.version: db["last_ran_version"] = self.version has_update = True else: db["last_ran_version"] = self.version has_update = True return has_update def _init_dem_schedules(self, client): try: response = client.call('schedules', method='GET') schedules = { 'icmp_server_resources' : response.get('icmp_server_resources', []), 'monitor_schedules' : response.get('monitor_schedules', []), 'traceroute_checks' : response.get('traceroute_checks', []) } self._sendReceive('initSchedules', port=self.dem_port, payload=json.dumps(schedules)) except Exception as aggEx: logging.error('/schedules error: {}'.format(str(aggEx))) return def _updateDEMServiceSchedules(self, newSchedules): if self.has_dem: _ = self._sendReceive('update-schedules', port=self.dem_port, payload=json.dumps(newSchedules)) def _sendReceive(self, command, port=None, payload=None): if not port: raise Exception('Server port not configured') import socket if payload: command = '{}:{}'.format(command, payload) command += '\n' #self.log.info('Send {}'.format(command)) try: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.settimeout(10.0) sock.connect(os.path.join(self.ipcPath, port)) toSend = command.encode('utf-8') sock.sendall(toSend) receivedBytes = bytes() while True: r = sock.recv(1024) #self.log.info(' received {} bytes'.format(len(r))) if 0 == len(r): break receivedBytes += r #self.log.info('Send {} bytes: received {} bytes'.format(len(toSend), len(receivedBytes))) received = receivedBytes.decode('utf-8') if 'nack' == received: return None return received except Exception as e: self.log.error('Send/recv failure: {}'.format(e)) return None
Close