J’ai un ServerFactory tordu que j’utilise pour envoyer des travaux aux clients. Le protocole contient une queue contenant tous les travaux. Une fois cette queue épuisée, si un client demande un nouveau travail, le serveur déconnecte le client. Finalement, cela ne laissera aucun client connecté et le serveur sera prêt à être arrêté.
Ma question est la suivante:
Je sais que pour les clients, la meilleure pratique consiste à utiliser twisted.internet.task.react pour se connecter et gérer la perte de connexion, fermant ainsi le processus parent. Mais je ne suis pas sûr que ce soit le cas également pour un serveur.
Actuellement, c’est comme ça que je m’arrête:
from twisted.application import internet, service from twisted.internet import reactor from twisted.internet.protocol import ServerFactory from twisted.protocols.basic import LineReceiver class ServerProtocol(LineReceiver): """Twisted Protocol for sending and receiving lines of bytes.""" clients = [] logger = logging.getLogger('launcher.Launcher.RunServer') def connectionMade(self) -> None: """When a connection is made add the client to the clients list.""" self.clients.append(self) def lineReceived(self, line: bytes) -> None: """Whenver a line is received send work to the sending client. Parameters ---------- line The message received from a client. """ msg = 'Received: ' + line.decode('utf-8') + ' from ' +\ self.transport.hostname self.logger.info(msg) if not self.queue.empty(): run = self.queue.get() run_bytes = bytes(run, 'utf-8') self.logger.info('Sending run bytes to %s', self.transport.hostname) self.sendLine(run_bytes) else: self.clients.remove(self) self.transport.loseConnection() if not self.clients: self.logger.info('Shutting down RunServer') self.reactor.stop() class RunServer(object): """Class for containing twisted server components. Parameters ---------- workers List of workers that will serve as clients. queue Queue of runs to execute. Atsortingbutes ---------- factory Twisted ServerFactory for producing protocols. """ def __init__(self, queue: Queue) -> None: self.factory = ServerFactory() self.factory.protocol = ServerProtocol self.factory.protocol.queue = queue self.factory.protocol.reactor = reactor def start(self) -> None: """Start the server and thereby the execution of runs.""" self.factory.protocol.reactor.listenTCP(80, self.factory) self.factory.protocol.reactor.run()
Comme vous pouvez le constater, je stocke le réacteur dans self.factory.protocol.reactor
et self.factory.protocol.reactor
une fois toutes les tâches épuisées et les clients déconnectés.
Je suis sûr que j’ai lu avant que ce n’est pas le modèle accepté pour les clients en cours d’exécution et je suppose que la même chose vaut pour les serveurs, mais je n’ai pas encore vu un bon exemple.
Je dois faire connaissance avec celui-ci.
Il n’y a absolument pas besoin de la classe RunServer. Sous-classer ServerFactory
et placer la logique de RunServer.__init__
dans les sous-classes __init__
autorisera le même comportement avec un meilleur contrôle. Vous pouvez ensuite simplement définir une méthode main
et utiliser react
comme décrit dans la documentation ( twisted.internet.task.react
)
Voici le code mis à jour:
from twisted.internet.defer import Deferred from twisted.internet import reactor from twisted.internet.protocol import ServerFactory from twisted.protocols.basic import LineReceiver class QueueingProtocol(LineReceiver): def connectionMade(self) -> None: self.factory.connectionMade() def connectionLost(self, reason) -> None: self.factory.connectionLost(reason) def lineReceived(self, line: bytes) -> None: msg = 'Received: ' + line.decode('utf-8') + ' from ' +\ self.transport.hostname self.logger.info(msg) if self.factory.empty(): self.transport.lostConnection() else: run = self.factory.get() run_bytes = bytes(run, 'utf-8') self.logger.info('Sending run bytes to %s', self.transport.hostname) self.sendLine(run_bytes) class QueueingFactory(ServerFactory): protocol = QueueingProtocol def __init__(self, queue) -> None: self.queue = queue self.connections = 0 self.queueHandled = Deferred() def connectionMade(self) -> None: self.connections += 1 def empty(self): return self.queue.empty() def get(self): return self.queue.get() def connectionLost(self, reason) -> None: self.connections -= 1 if self.connections == 0 and self.empty(): self.queueHandled.callback("done") def main(reactor, queue): factory = QueueingFactory(queue) reactor.listenTCP(80, factory) return factory.queueHandled
Ensuite, il vous suffit d’importer le react(main, [some_queue])
main
endroit où vous en avez besoin et d’appeler react(main, [some_queue])