Arrêt du processus Python en raison de l’ouverture des connexions ssh de Paramiko

J’utilise Paramiko pour surveiller les journaux sur les ordinateurs distants lors d’un test.

Le moniteur se passe dans un thread de démon qui fait à peu près ceci:

ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) transport = ssh.get_transport() channel = transport.open_session() channel.exec_command('sudo tail -f ' + self.logfile) last_partial = '' while not self.stopped.isSet(): try: if None == select or None == channel: break rl, wl, xl = select.select([channel], [], [], 1.0) if None == rl: break if len(rl) > 0: # Must be stdout, how can I check? line = channel.recv(1024) else: time.sleep(1.0) continue except: break if line: #handle saving the line... lines are 'merged' so that one log is made from all the sources ssh.close() 

J’ai eu des problèmes avec les lectures de blocage, alors j’ai commencé à faire les choses de cette façon et la plupart du temps ça marche bien. Je pense que je rencontre des problèmes en cas de lenteur du réseau.

Parfois, je vois cette erreur à la fin d’une parsing (après que self.stopped ci-dessus est défini). J’ai essayé de dormir après m’être arrêté et avoir rejoint tous les threads du moniteur, mais le blocage peut toujours se produire.

 Exception in thread Thread-9 (most likely raised during interpreter shutdown): Traceback (most recent call last): File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner File "/usr/lib/python2.6/site-packages/paramiko/transport.py", line 1470, in run : 'NoneType' object has no atsortingbute 'error' 

Dans transport.py de Paramiko, je pense que c’est là que se trouve l’erreur. Recherchez les valeurs suivantes:

  self._channel_handler_table[ptype](chan, m) elif chanid in self.channels_seen: self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid) else: self._log(ERROR, 'Channel request for unknown channel %d' % chanid) self.active = False self.packetizer.close() elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table): self.auth_handler._handler_table[ptype](self.auth_handler, m) else: self._log(WARNING, 'Oops, unhandled type %d' % ptype) msg = Message() msg.add_byte(cMSG_UNIMPLEMENTED) msg.add_int(m.seqno) self._send_message(msg) except SSHException as e: self._log(ERROR, 'Exception: ' + str(e)) self._log(ERROR, util.tb_ssortingngs()) #<<<<<<<<<<<<<<<<<<<<<<<<<<< line 1470 self.saved_exception = e except EOFError as e: self._log(DEBUG, 'EOF in transport thread') #self._log(DEBUG, util.tb_strings()) self.saved_exception = e except socket.error as e: if type(e.args) is tuple: if e.args: emsg = '%s (%d)' % (e.args[1], e.args[0]) else: # empty tuple, eg socket.timeout emsg = str(e) or repr(e) else: emsg = e.args self._log(ERROR, 'Socket exception: ' + emsg) self.saved_exception = e except Exception as e: self._log(ERROR, 'Unknown exception: ' + str(e)) self._log(ERROR, util.tb_strings()) 

Quand une course est bloquée, je peux lancer >>>>> sudo lsof -i -n | egrep ‘\’ pour voir qu’il y a bien des connexions ssh bloquées (indéfiniment bloquées). Mon processus de test principal est le PID 15010.

 sshd 6478 root 3u IPv4 46405 0t0 TCP *:ssh (LISTEN) sshd 6478 root 4u IPv6 46407 0t0 TCP *:ssh (LISTEN) sshd 14559 root 3r IPv4 3287615 0t0 TCP 172.16.0.171:ssh- >10.42.80.100:59913 (ESTABLISHED) sshd 14563 cmead 3u IPv4 3287615 0t0 TCP 172.16.0.171:ssh->10.42.80.100:59913 (ESTABLISHED) python 15010 root 12u IPv4 3291525 0t0 TCP 172.16.0.171:43227->172.16.0.142:ssh (ESTABLISHED) python 15010 root 15u IPv4 3291542 0t0 TCP 172.16.0.171:41928->172.16.0.227:ssh (ESTABLISHED) python 15010 root 16u IPv4 3291784 0t0 TCP 172.16.0.171:57682->172.16.0.48:ssh (ESTABLISHED) python 15010 root 17u IPv4 3291779 0t0 TCP 172.16.0.171:43246->172.16.0.142:ssh (ESTABLISHED) python 15010 root 20u IPv4 3291789 0t0 TCP 172.16.0.171:41949->172.16.0.227:ssh (ESTABLISHED) python 15010 root 65u IPv4 3292014 0t0 TCP 172.16.0.171:51886->172.16.0.226:ssh (ESTABLISHED) sshd 15106 root 3r IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED) sshd 15110 cmead 3u IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED) 

Donc, je veux juste que mon processus ne pende pas. Oh, et je ne veux pas mettre Paramiko à jour si cela nécessite une mise à jour de Python au-delà de la version 2.6.6, car sur centos et d’après ce que j’ai lu après la version 2.6.6, cela peut être compliqué.

Merci pour toutes les idées.


Commentaire à shavenwarthog qui est trop long pour les commentaires:

Bonjour, merci pour la réponse. J’ai quelques questions rapides. 1) Et si je devais arrêter les threads à une heure inconnue? En d’autres termes, les threads tail -f blah.log dureront peut-être 3 minutes et je veux peut-être vérifier les données accumulées 10 fois dans ces trois minutes? 2) un peu pareil, je suppose, quand j’ai essayé ceci avec quelques machines distantes réelles il ne quitterait pas (puisque la queue -f ne quitte jamais). Je l’avais oublié mais je pense que la lecture non bloquante devait résoudre ce problème. Pensez-vous que l’ autre sujet que vous avez commenté et celui-ci suffisent pour que cela fonctionne? Fondamentalement, utilisez ma lecture non bloquante pour rassembler des données locales sur chaque thread de coureur. Ensuite, je n’aurais besoin de verrouiller que lorsque le thread principal veut des données de chaque coureur, ce qui semble vouloir dissortingbuer mon verrou unique pour dire 10 verrous et cela aiderait. Cela a-t-il du sens?

Le code suivant exécute une commande sur plusieurs hôtes. Lorsque chaque commande a des données en attente, elles sont imprimées à l’écran.

La forme générale est adaptée du code d’ Alex Martelli . Cette version a plus de journalisation, y compris une version lisible par l’homme de l’hôte de chaque connexion.

Le code d’origine a été écrit pour les commandes qui s’exécutent puis sortent. Je l’ai changé pour imprimer des données de manière incrémentielle, quand elles sont disponibles. Auparavant, le premier thread ayant saisi le verrou bloquait le read() et tous les threads mourraient de faim. La nouvelle solution contourne cela.

EDIT , quelques notes:

Pour arrêter le programme ultérieurement, nous nous heurtons à une situation plutôt difficile. Les threads sont ininterruptibles – nous ne pouvons pas simplement configurer un gestionnaire de signaux pour sys.exit() le programme. Le code mis à jour est configuré pour sortir en toute sécurité après 3 secondes, en utilisant une boucle while pour join() chaque thread. Pour le code réel, si le parent se ferme, les threads doivent également être correctement. Notez attentivement les deux AVERTISSEMENTS dans le code, car l’interaction signal / sortie / thread est plutôt irrégulière.

Le code traite les données telles qu’elles se présentent – les données sont maintenant imprimées sur la console. Il n’utilise pas de lectures non bloquantes car 1) le code non bloquant est beaucoup plus complexe, et 2) le programme d’origine n’a pas traité les données du thread enfant dans le parent. Pour les threads, il est plus facile de faire tout le thread enfant, qui écrit dans un fichier, une firebase database ou un service. Pour tout ce qui est plus compliqué, utilisez le multiprocessing ce qui est beaucoup plus facile, et dispose de bonnes fonctionnalités pour effectuer de nombreux travaux et les redémarrer s’ils meurent. Cette bibliothèque vous permet également de répartir la charge sur plusieurs processeurs, ce que ne permet pas le threading.

S’amuser!

EDIT # 2

Notez qu’il est possible, et probablement préférable, d’exécuter plusieurs processus sans utiliser le threading ou le multiprocessing . TLDR: utilisez Popen et une boucle select() pour traiter des lots de sortie. Voir l’exemple de code dans Pastebin: exécutez plusieurs commandes sans sous-processus / multitraitement

la source

 # adapted from https://stackoverflow.com/questions/3485428/creating-multiple-ssh-connections-at-a-time-using-paramiko import signal, sys, threading import paramiko CMD = 'tail -f /var/log/syslog' def signal_cleanup(_signum, _frame): print '\nCLEANUP\n' sys.exit(0) def workon(host): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host) _stdin, stdout, _stderr = ssh.exec_command(CMD) for line in stdout: print threading.current_thread().name, line, def main(): hosts = ['localhost', 'localhost'] # exit after a few seconds (see WARNINGs) signal.signal(signal.SIGALRM, signal_cleanup) signal.alarm(3) threads = [ threading.Thread( target=workon, args=(host,), name='host #{}'.format(num+1) ) for num,host in enumerate(hosts) ] print 'starting' for t in threads: # WARNING: daemon=True allows program to exit when main proc # does; otherwise we'll wait until all threads complete. t.daemon = True t.start() print 'joining' for t in threads: # WARNING: t.join() is uninterruptible; this while loop allows # signals # see: http://snakesthatbite.blogspot.com/2010/09/cpython-threading-interrupting.html while t.is_alive(): t.join(timeout=0.1) print 'done!' if __name__=='__main__': main() 

sortie

 starting joining host #2 Jun 27 16:28:25 palaarm kernel: [158950.369443] ideapad_laptop: Unknown event: 1 host #2 Jun 27 16:29:12 palaarm kernel: [158997.098833] ideapad_laptop: Unknown event: 1 host #1 Jun 27 16:28:25 palaarm kernel: [158950.369443] ideapad_laptop: Unknown event: 1 host #1 Jun 27 16:29:12 palaarm kernel: [158997.098833] ideapad_laptop: Unknown event: 1 host #1 Jun 27 16:29:36 palaarm kernel: [159020.809748] ideapad_laptop: Unknown event: 1