Arrêter le serveur tordu

J’ai un ServerFactory tordu que j’utilise pour envoyer des travaux aux clients. Le protocole contient une queue contenant tous les travaux. Une fois cette queue épuisée, si un client demande un nouveau travail, le serveur déconnecte le client. Finalement, cela ne laissera aucun client connecté et le serveur sera prêt à être arrêté.

Ma question est la suivante:

Existe-t-il une pratique généralement acceptée pour arrêter ServerFactory une fois son travail terminé?

Je sais que pour les clients, la meilleure pratique consiste à utiliser twisted.internet.task.react pour se connecter et gérer la perte de connexion, fermant ainsi le processus parent. Mais je ne suis pas sûr que ce soit le cas également pour un serveur.

Actuellement, c’est comme ça que je m’arrête:

from twisted.application import internet, service from twisted.internet import reactor from twisted.internet.protocol import ServerFactory from twisted.protocols.basic import LineReceiver class ServerProtocol(LineReceiver): """Twisted Protocol for sending and receiving lines of bytes.""" clients = [] logger = logging.getLogger('launcher.Launcher.RunServer') def connectionMade(self) -> None: """When a connection is made add the client to the clients list.""" self.clients.append(self) def lineReceived(self, line: bytes) -> None: """Whenver a line is received send work to the sending client. Parameters ---------- line The message received from a client. """ msg = 'Received: ' + line.decode('utf-8') + ' from ' +\ self.transport.hostname self.logger.info(msg) if not self.queue.empty(): run = self.queue.get() run_bytes = bytes(run, 'utf-8') self.logger.info('Sending run bytes to %s', self.transport.hostname) self.sendLine(run_bytes) else: self.clients.remove(self) self.transport.loseConnection() if not self.clients: self.logger.info('Shutting down RunServer') self.reactor.stop() class RunServer(object): """Class for containing twisted server components. Parameters ---------- workers List of workers that will serve as clients. queue Queue of runs to execute. Atsortingbutes ---------- factory Twisted ServerFactory for producing protocols. """ def __init__(self, queue: Queue) -> None: self.factory = ServerFactory() self.factory.protocol = ServerProtocol self.factory.protocol.queue = queue self.factory.protocol.reactor = reactor def start(self) -> None: """Start the server and thereby the execution of runs.""" self.factory.protocol.reactor.listenTCP(80, self.factory) self.factory.protocol.reactor.run() 

Comme vous pouvez le constater, je stocke le réacteur dans self.factory.protocol.reactor et self.factory.protocol.reactor une fois toutes les tâches épuisées et les clients déconnectés.

Je suis sûr que j’ai lu avant que ce n’est pas le modèle accepté pour les clients en cours d’exécution et je suppose que la même chose vaut pour les serveurs, mais je n’ai pas encore vu un bon exemple.

Je dois faire connaissance avec celui-ci.

Il n’y a absolument pas besoin de la classe RunServer. Sous-classer ServerFactory et placer la logique de RunServer.__init__ dans les sous-classes __init__ autorisera le même comportement avec un meilleur contrôle. Vous pouvez ensuite simplement définir une méthode main et utiliser react comme décrit dans la documentation ( twisted.internet.task.react )

Voici le code mis à jour:

 from twisted.internet.defer import Deferred from twisted.internet import reactor from twisted.internet.protocol import ServerFactory from twisted.protocols.basic import LineReceiver class QueueingProtocol(LineReceiver): def connectionMade(self) -> None: self.factory.connectionMade() def connectionLost(self, reason) -> None: self.factory.connectionLost(reason) def lineReceived(self, line: bytes) -> None: msg = 'Received: ' + line.decode('utf-8') + ' from ' +\ self.transport.hostname self.logger.info(msg) if self.factory.empty(): self.transport.lostConnection() else: run = self.factory.get() run_bytes = bytes(run, 'utf-8') self.logger.info('Sending run bytes to %s', self.transport.hostname) self.sendLine(run_bytes) class QueueingFactory(ServerFactory): protocol = QueueingProtocol def __init__(self, queue) -> None: self.queue = queue self.connections = 0 self.queueHandled = Deferred() def connectionMade(self) -> None: self.connections += 1 def empty(self): return self.queue.empty() def get(self): return self.queue.get() def connectionLost(self, reason) -> None: self.connections -= 1 if self.connections == 0 and self.empty(): self.queueHandled.callback("done") def main(reactor, queue): factory = QueueingFactory(queue) reactor.listenTCP(80, factory) return factory.queueHandled 

Ensuite, il vous suffit d’importer le react(main, [some_queue]) main endroit où vous en avez besoin et d’appeler react(main, [some_queue])