Source code for module.libs.snmpworker

# -*- coding: utf-8 -*-

# Copyright (C) 2012-2014:
#    Thibault Cohen,
# This file is part of SNMP Booster Shinken Module.
# Shinken is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# Shinken is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with SNMP Booster Shinken Module.
# If not, see <>.

""" This module contains a class to create a Thread which make SNMP requests
and handle answers with callbacks


from threading import Thread
import re
import time

from shinken.log import logger

    from pysnmp.entity.rfc3413.oneliner import cmdgen
    from pysnmp.smi.exval import noSuchInstance
except ImportError as exp:
    logger.error("[SnmpBooster] [code 0601] Import error. Pysnmp is missing")
    raise ImportError(exp)

[docs]class SNMPWorker(Thread): """ Thread which execute all SNMP tasks/requests """ def __init__(self, mapping_queue, max_prepared_tasks): Thread.__init__(self) self.cmdgen = None # will be cmdgen.AsynCommandGenerator() self.mapping_queue = mapping_queue self.max_prepared_tasks = max_prepared_tasks self.must_run = False self.task_prepared = 0
[docs] def append_task_to_dispatcher(self, snmp_task): if snmp_task['type'] in ['bulk', 'next', 'get']: # Append snmp requests snmp_command_name = ("async" + snmp_task['type'].capitalize() + "Cmd") getattr(self.cmdgen, snmp_command_name)(**snmp_task['data']) # Mark task as done self.mapping_queue.task_done() self.task_prepared += 1 else: # If the request is not handled error_message = ("Bad SNMP requets type: '%s'. Must be " "get, next or bulk." % snmp_task['type']) logger.error("[SnmpBooster] [code 0603] [%s] " "%s" % (snmp_task['host'], error_message))
[docs] def run(self): """ Process SNMP tasks SNMP task is a dict: - For a bulk request :: {"authData": cmdgen.CommunityData('public') "transportTarget": cmdgen.UdpTransportTarget((transportTarget, 161)) "nonRepeaters": 0 "maxRepetitions": 64 "varNames": ['', '...'] "cbInfo:: (cbFun, (arg1, arg2, ...)) } - For a next request :: {"authData": cmdgen.CommunityData('public') "transportTarget": cmdgen.UdpTransportTarget((transportTarget, 161)) "varNames": ['', '...'] "cbInfo:: (cbFun, (arg1, arg2, ...)) } - For a get request :: {"authData": cmdgen.CommunityData('public) "transportTarget": cmdgen.UdpTransportTarget((transportTarget, 161)) "varNames": ['', '...'] "cbInfo:: (cbFun, (arg1, arg2, ...)) } """ self.must_run = True"[SnmpBooster] [code 0602] is starting") slow_host_waiting = [] while self.must_run: # Prevent memory leak del(self.cmdgen) self.cmdgen = cmdgen.AsynCommandGenerator() # End prevent memory leak self.task_prepared = 0 # slow host slow_host_prepared = [] # Process slow hosts tasks for index, snmp_task in enumerate(slow_host_waiting): # Check if we have our max prepared tasks if self.task_prepared > self.max_prepared_tasks: break # Handle slow hosts if snmp_task['no_concurrency']: if snmp_task['host'] in slow_host_prepared: continue else: slow_host_prepared.append(snmp_task['host']) # Add task dispatcher slow_host_waiting.pop(index) self.append_task_to_dispatcher(snmp_task) # Process normal tasks while (not self.mapping_queue.empty()) and self.task_prepared <= self.max_prepared_tasks: # Get task snmp_task = self.mapping_queue.get() # Handle slow hosts if snmp_task['no_concurrency']: if snmp_task['host'] in slow_host_prepared: slow_host_waiting.append(snmp_task) continue else: slow_host_prepared.append(snmp_task['host']) # Add task dispatcher self.append_task_to_dispatcher(snmp_task) if self.task_prepared > 0: # Launch SNMP requests self.cmdgen.snmpEngine.transportDispatcher.runDispatcher() else: # Sleep time.sleep(0.1)"[SnmpBooster] [code 0604] is stopped")
[docs] def stop_worker(self): """ Stop SNMP worker thread """"[SnmpBooster] [code 0605] will be stopped") self.must_run = False
[docs]def handle_snmp_error(error_indication, cb_ctx, request_type): """ Handle SNMP errors """ if error_indication is None: # No error return False # Get results results = cb_ctx[0] # Current elected service result service_result = cb_ctx[1] # Log SNMP error logger.error("[SnmpBooster] [code 0606] [%s] SNMP Error: " "%s" % (service_result['host'], str(error_indication))) # If is a get request if request_type == "get": # We set SNMP error in all oids for result in results.values(): result['error'] = str(error_indication) return True
[docs]def callback_get(send_request_handle, error_indication, error_status, error_index, var_binds, cb_ctx): """ Callback function for GET SNMP requests """ # Get the oid list results = cb_ctx[0] # Current elected service result service_result = cb_ctx[1] # Get queue to submit result result_queue = cb_ctx[2] # Handle errors if handle_snmp_error(error_indication, cb_ctx, "get"): # set as received service_result['state'] = 'received' result_queue.put(results) return False # browse reponses for oid, value in var_binds: # for each oid, value # prepare the oid oid = "." + oid.prettyPrint() # if we need this oid if oid in results: # Check if we have a nosuchinstance error if value == noSuchInstance: # Log NoSuchInstance SNMP error message = "Oid not found on the device: %s" % oid logger.error("[SnmpBooster] [code 0607] [%s, %s] SNMP Error: " "%s" % (results.values()[0]['key']['host'], results[oid]['key']['service'], message)) results[oid]['error'] = message else: # save value results[oid]['value'] = value # save check time results[oid]['check_time'] = time.time() # Check if we get all values result_with_value_or_error = [oid['value'] for oid in results.values() if oid.get('value') is None and oid.get('error') is None] if len(result_with_value_or_error) == 0: # Add a saving task to the saving queue # (processed by the function save_results) result_queue.put(results) # Prepare datas for the current service tmp_results = [r for r in results.values() if r['key']['host'] == service_result['host'] and r['key']['service'] == service_result['service']] for tmp_result in tmp_results: key = tmp_result.get('key') # ds name ds_names = key.get('ds_names') for ds_name in ds_names: # Last value last_value_key = ".".join(("ds", ds_name, key.get('oid_type') + "_value_last" ) ) # New value value_key = ".".join(("ds", ds_name, key.get('oid_type') + "_value" ) ) # Set last value service_result['db_data']['ds'][ds_name][last_value_key] = tmp_result.get('value_last') # Set value service_result['db_data']['ds'][ds_name][value_key] = tmp_result.get('value') # Set last check time service_result['db_data']['check_time_last'] = service_result['db_data'].get('check_time') # Set check time service_result['db_data']['check_time'] = time.time() # set as received service_result['state'] = 'received' # Calculate execution time service_result['execution_time'] = time.time() - service_result['start_time'] else: pass # Not all data are received, we need to wait an other query
[docs]def callback_mapping_next(send_request_handle, error_indication, error_status, error_index, var_binds, cb_ctx): """ Callback function for GENEXT SNMP requests """ # Retrive context mapping_oid = cb_ctx[0] result = cb_ctx[2] # Handle errors if handle_snmp_error(error_indication, cb_ctx, "next"): result['finished'] = True return False # Parse snmp results for table_row in var_binds: for oid, instance_name in table_row: oid = "." + oid.prettyPrint() # Test if we are not in the mapping oid if not oid.startswith(mapping_oid): # We are not in the mapping oid result['finished'] = True return False instance = oid.replace(mapping_oid + ".", "") # DEBUGGING # print "OID", oid # print "MAPPING", mapping_oid # print "VAL", instance_name.prettyPrint() # END DEBUGGING # Handle illegal characters cleaned_instance_name = re.sub("[,:/ ]", "_", str(instance_name)) # If we need this instance we store it if instance_name in result['data']: result['data'][instance_name] = instance # If we need this 'cleaned' instance we store it elif cleaned_instance_name in result['data']: result['data'][cleaned_instance_name] = instance # Check if mapping is finished if all(result['data'].values()): result['finished'] = True return False return True
[docs]def callback_mapping_bulk(send_request_handle, error_indication, error_status, error_index, var_binds, cb_ctx): """ Callback function for BULK SNMP requests """ # Retrive context mapping_oid = cb_ctx[0] result = cb_ctx[2] # Handle errors if handle_snmp_error(error_indication, cb_ctx, "bulk"): result['finished'] = True return False # Parse snmp results for table_row in var_binds: for oid, instance_name in table_row: oid = "." + oid.prettyPrint() # Test if we are not in the mapping oid if not oid.startswith(mapping_oid): # We are not in the mapping oid result['finished'] = True return False # Get instance instance = oid.replace(mapping_oid + ".", "") # DEBUGGING # print "OID", oid # print "MAPPING", mapping_oid # print "VAL", instance_name.prettyPrint() # END DEBUGGING # Handle illegal characters cleaned_instance_name = re.sub("[,:/ ]", "_", str(instance_name)) # If we need this instance we store it if instance_name in result['data']: result['data'][instance_name] = instance # If we need this 'cleaned' instance we store it elif cleaned_instance_name in result['data']: result['data'][cleaned_instance_name] = instance # Check if mapping is finished if all(result['data'].values()): result['finished'] = True return False return True