import zlib

    import Queue as queue
    import queue

import threading, time, sys
import os, json, traceback, socket, tempfile, atexit
from contextlib import closing

    import xmlrpclib
    import xmlrpc.client as xmlrpclib

    from xmlrpc.server import SimpleXMLRPCServer
    from xmlrpc.server import SimpleXMLRPCRequestHandler
except:                                                         # pragma: no cover
    from SimpleXMLRPCServer import SimpleXMLRPCServer           # pragma: no cover
    from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler   # pragma: no cover

    from socketserver import ThreadingMixIn
except:                                                         # pragma: no cover
    from SocketServer import ThreadingMixIn                     # pragma: no cover

from collections import OrderedDict

debug = False

[docs]class CommNode(object): server_types = { "NEURON": "Blender", "Blender": "NEURON", "Control-Blender": "Blender", "Control-NEURON": "NEURON", "Package": "Blender", } def __init__(self, server_end, on_client_connected=None, on_server_setup=None, coverage=False): self.coverage = coverage self.groups = OrderedDict() self.root_index = OrderedDict() self.load_config() if server_end in self.server_types.keys(): self.server_end = server_end self.client_end = self.server_types[server_end] else: raise Exception("Unrecognized server_end: " + str(server_end) + ". Should be one of: " + str(self.server_types.keys())) self.on_client_connected = on_client_connected self.on_server_setup = on_server_setup # Try connecting to the other node (if it's running) self.try_setup_client() # 'Control' nodes are 1-directional (a node with a connected client, but no server of its own) if 'Control' in self.server_end: return # Create a server # Package type does not result in server if server_end != 'Package': self.setup_server() # If successfully connected, then instruct the other node to connect back # and complete the 2nd half of the connection if self.client is not None: self.client.try_setup_client() if self.client is not None and hasattr(self, 'server') and self.server is not None: self.print_safe("Two-way communication between %s and %s established" %(self.server_end, self.client_end)) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if self.coverage: self.client.end_code_coverage() self.stop_server()
[docs] def load_config(self): package_dir = os.path.dirname(os.path.abspath(__file__)) config_file = os.path.join(package_dir, 'config.json') with open(config_file, "r") as f: self.config = json.load(f)[0]
[docs] def init_task_queue(self): self.queue = queue.Queue() self.task_lock = threading.Lock() self.tasks = {} self.task_next_id = 0
[docs] def setup_server(self): class ErrorHandler(SimpleXMLRPCRequestHandler): def _dispatch(self, method, params): try: return self.server.funcs[method](*params) except: import traceback traceback.print_exc() raise class CommNodeServer(ThreadingMixIn, SimpleXMLRPCServer, object): """ A helper class to create an XML-RCP server """ def __init__(self, param): self.daemon_threads = True super(CommNodeServer, self).__init__( param, requestHandler=ErrorHandler, allow_none=True, logRequests=False ) self.init_task_queue() port = self.config["default_port"][self.server_end] self.server_ip = self.config["default_ip"][self.server_end] self.server_port = self.find_free_port() if port == "" else port self.server_address = 'http://' + self.server_ip + ':' + self.server_port self.server = CommNodeServer((self.server_ip, int(self.server_port))) self.server.register_introspection_functions() # Basic server functions self.server.register_function(self.sm_stop, 'stop') self.server.register_function(self.sm_ping, 'ping') self.server.register_function(self.try_setup_client, 'try_setup_client') # Synchronous execution self.server.register_function(self.sm_run_command, 'run_command') # Asynchronous task execution queueing self.server.register_function(self.sm_enqueue_command, 'enqueue_command') self.server.register_function(self.sm_get_task_status, 'get_task_status') self.server.register_function(self.sm_get_task_error, 'get_task_error') self.server.register_function(self.sm_get_task_result, 'get_task_result') # Code coverage result saving self.server.register_function(self.sm_end_code_coverage, 'end_code_coverage') # Start the server in a separate thread - it will place tasks onto queue self.server_thread = threading.Thread(target=self.server.serve_forever) self.server_thread.daemon = True self.server_thread.start() # Perform the servicing of queued tasks in a separate thread self.service_thread = threading.Thread(target=self.service_queue_loop) self.service_thread.daemon = True self.service_thread_continue = True # When false, queue servicing thread will stop self.service_thread.start() if self.on_server_setup is not None: self.on_server_setup() # Communicate the address of the server to the client self.save_server_address_file() self.print_safe("Started " + self.server_end + " server at: " + self.server_address)
[docs] def stop_server(self): if hasattr(self, "service_thread") and self.service_thread is not None and self.service_thread.is_alive(): self.service_thread_continue = False self.service_thread.join() self.service_thread = None self.init_task_queue() if hasattr(self, "server_thread") and self.server_thread is not None and self.server_thread.is_alive(): try: own_server_client = xmlrpclib.ServerProxy(self.server_address, allow_none=True) own_server_client.stop() self.server_thread.join() self.server_thread = None except: # pragma: no cover pass # pragma: no cover if hasattr(self, "server"): self.server = None self.server_ip = None self.server_port = None self.server_address = None self.save_server_address_file()
[docs] def find_free_port(self): """ :return: A random available port """ with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(('localhost', 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return str(s.getsockname()[1])
[docs] def try_setup_client(self, warn=True): try: config_ip = self.config["default_ip"][self.client_end] config_port = self.config["default_port"][self.client_end] if config_ip != "" and config_port != "": client_address = 'http://' + config_ip + ':' + config_port else: # Try reading the client address file client_address = self.read_client_address_file() # Create XML-RCP client and attempt to connect to it self.client = xmlrpclib.ServerProxy(client_address, allow_none=True) assert == 1 # If connection succeeded, save the address self.client_address = client_address except (IOError, ValueError, AssertionError): if warn: self.print_safe("Could not connect to " + self.client_end + " server. Ensure "+ ("Blender+BlenderNEURON addon" if self.server_end == "NEURON" else "NEURON+BlenderNEURON package")+ " is running.") self.client = None self.client_address = None if self.client is not None and self.on_client_connected is not None: self.on_client_connected(self)
[docs] def read_client_address_file(self): """ Read from a file in tmp folder that contains the address of the other node's XML-RCP server :return: The address of the other end's XML-RCP server """ tmp_dir = os.path.abspath(tempfile.gettempdir()) file_name = self.get_end_address_file(self.client_end) if not os.path.exists(file_name): raise IOError("Client address file not found:" + file_name) with open(file_name, "r") as f: client_address = if "http" not in client_address: raise ValueError("Address in the client address file does not appear to be valid:" + client_address) # pragma: no cover return client_address
[docs] def save_server_address_file(self): """ Create a file in tmp folder that contains the address of this node's XML-RCP server :return: Nothing """ file_name = self.get_end_address_file(self.server_end) # If the server address is blank (i.e. on cleanup), remove the file if it exists if self.server_address is None: if os.path.exists(file_name): try: os.remove(file_name) except OSError: # pragma: no cover pass # pragma: no cover else: with open(file_name, "w") as f: f.write(self.server_address)
[docs] def get_end_address_file(self, end): tmp_dir = os.path.abspath(tempfile.gettempdir()) file_name = os.path.join(tmp_dir, "BlenderNEURON-" + end + ".address") return file_name
[docs] def print_safe(self, value): if debug == False: return try: # pragma: no cover print(value) # pragma: no cover except: # pragma: no cover tb = traceback.format_exc() # pragma: no cover print(tb) # pragma: no cover
[docs] def sm_stop(self): """ A method that, when called by a client of the node server, will stop the server :return: """ if hasattr(self, "server") and self.server is not None: self.server.shutdown() self.server.server_close() return 0
[docs] def sm_ping(self): self.print_safe(self.server_end + " server at " + self.server_address + " ALIVE") return 1
[docs] def sm_run_command(self, command_string): exec_lambda = self._get_command_lambda(command_string) return self._run_lambda(exec_lambda)
[docs] def sm_enqueue_command(self, command_string): exec_lambda = self._get_command_lambda(command_string) return self._enqueue_lambda(exec_lambda)
[docs] def sm_end_code_coverage(self): try: print('Getting Coverage info', self.server_end) from blenderneuron import COV except: # pragma: no cover print('Failed to get COV', self.server_end) # pragma: no cover # Dont try to save coverage info if not in coverage return # pragma: no cover COV.stop() # pragma: no cover # pragma: no cover print('SAVED Coverage info', self.server_end) # pragma: no cover
def _get_command_lambda(self, command_string): """ Execute arbitrary python command within Blender's python process :param command_string: A python expression. Set variable 'return_value' to receive a response. e.g. command_string = "print('test')" -> will print 'test' in Blender console e.g. command_string = "return_value = 1+3" -> will compute in Blender and return 4 e.g. command_string = "import bpy; return_value = [i for i in]" -> will list all Blender objects :return: """ def exec_lambda(): end_imports = self.config["imports"][self.server_end] try: exec(end_imports + "; " + command_string, globals()) except SystemExit: raise except: print('Error while attempting to run the following command(s) within ' + self.server_end + ':') print('------------ Command ------------') print(command_string.replace(";","\n")) print('---------------------------------') raise return globals().pop('return_value', None) return exec_lambda def _run_lambda(self, task_lambda): id = self._enqueue_lambda(task_lambda) while self.sm_get_task_status(id) == 'QUEUED': time.sleep(0.1) status = self.sm_get_task_status(id) if status == "SUCCESS": return self.sm_get_task_result(id) else: raise Exception(self.sm_get_task_error(id)) def _enqueue_lambda(self, task_lambda): task_id = self._get_new_task_id() task = { "id": task_id, "status": "QUEUED", "lambda": task_lambda, "result": None, "error": None } self.tasks[task_id] = task self.queue.put(task) return task_id def _get_new_task_id(self): with self.task_lock: task_id = self.task_next_id self.task_next_id += 1 return task_id
[docs] def sm_get_task_status(self, task_id): if task_id in self.tasks: return self.tasks[task_id]["status"] return "DOES_NOT_EXIST"
[docs] def sm_get_task_error(self, task_id): return self.tasks[task_id]["error"]
[docs] def sm_get_task_result(self, task_id): return self.tasks[task_id]["result"]
[docs] def work_on_queue_tasks(self): q = self.queue self.queue_error = False while not q.empty(): self.print_safe("Tasks in queue. Getting next task...") task = q.get() try: if not self.queue_error: self.print_safe("Running task...") result = task["lambda"]() task["result"] = result task["status"] = "SUCCESS" else: self.print_safe("Previous task had an error. SKIPPING.") task["status"] = "ERROR" except: self.queue_error = True tb = traceback.format_exc() if "SystemExit" not in tb: task["status"] = "ERROR" task["error"] = tb self.print_safe(tb) else: task["status"] = "SUCCESS" task["result"] = None # We want to allow the RCP server to send back a response before killing the process def self_destruct(): self.print_safe("Exiting NEURON in 1s ... ") time.sleep(0.5) quit() thread = threading.Thread(target=self_destruct) thread.start() q.task_done() self.print_safe("DONE")
[docs] def service_queue_loop(self): while self.service_thread_continue: if not self.queue.empty(): self.work_on_queue_tasks() else: time.sleep(0.1)
[docs] def compress(self, obj): compressed = str(obj) try: compressed = xmlrpclib.Binary(zlib.compress(compressed, 2)) except: compressed = xmlrpclib.Binary(zlib.compress(compressed.encode('utf8'), 2)) return compressed
[docs] def decompress(self, compressed): uncompressed = eval(zlib.decompress('utf-8')) return uncompressed