Utilisation d’Amazon SWF Pour communiquer entre les serveurs

Utilisez Amazon SWF pour communiquer des messages entre des serveurs?

  1. Sur le serveur, AI veut exécuter un script
  2. Lorsque cela est terminé, je veux envoyer un message au serveur B pour exécuter un script B
  3. Si cela se termine avec succès, je souhaite que le travail soit effacé de la queue de workflow

J’ai beaucoup de mal à comprendre comment utiliser Boto et SWF en combinaison. Je ne suis pas après un code complet mais ce que je suis après est si quelqu’un peut expliquer un peu plus sur ce qui est impliqué.

  • Comment puis-je dire au serveur B de vérifier la fin du script A?
  • Comment puis-je m’assurer que le serveur A ne va pas terminer le script A et essayer le script B (puisque le serveur B doit l’exécuter)?
  • Comment puis-je informer SWF du script A Achèvement? Êtes-vous un drapeau, un message ou quoi?

Comme vous pouvez le voir, je suis vraiment confus à propos de tout cela, si quelqu’un peut nous éclairer, je l’apprécierais vraiment.

Je pense que vous posez de très bonnes questions qui soulignent à quel point SWF peut être utile en tant que service. En bref, vous ne dites pas à vos serveurs de coordonner le travail entre eux. Votre décideur orchestre tout cela pour vous, avec l’aide du service SWF.

La mise en œuvre de votre stream de travail se déroulera comme suit:

  1. Enregistrer votre stream de travail et ses activités avec le service (une seule fois).
  2. Mettre en œuvre le décideur et les travailleurs.
  3. Laissez courir vos travailleurs et vos décideurs.
  4. Démarrer un nouveau workflow.

Il existe un certain nombre de façons d’alimenter les informations d’identification dans le code de boto.swf. Pour les besoins de cet exercice, je vous recommande de les exporter dans l’environnement avant d’exécuter le code ci-dessous:

export AWS_ACCESS_KEY_ID= export AWS_SECRET_ACCESS_KEY= 

1) Pour enregistrer le domaine, le workflow et les activités sont exécutés comme suit:

 # ab_setup.py import boto.swf.layer2 as swf DOMAIN = 'stackoverflow' ACTIVITY1 = 'ServerAActivity' ACTIVITY2 = 'ServerBActivity' VERSION = '1.0' swf.Domain(name=DOMAIN).register() swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register() swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register() swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register() 

2) Mettre en œuvre et exécuter des décideurs et des travailleurs.

 # ab_decider.py import time import boto.swf.layer2 as swf DOMAIN = 'stackoverflow' ACTIVITY1 = 'ServerAActivity' ACTIVITY2 = 'ServerBActivity' VERSION = '1.0' class ABDecider(swf.Decider): domain = DOMAIN task_list = 'default_tasks' version = VERSION def run(self): history = self.poll() # Print history to familiarize yourself with its format. print history if 'events' in history: # Get a list of non-decision events to see what event came in last. workflow_events = [e for e in history['events'] if not e['eventType'].startswith('Decision')] decisions = swf.Layer1Decisions() # Record latest non-decision event. last_event = workflow_events[-1] last_event_type = last_event['eventType'] if last_event_type == 'WorkflowExecutionStarted': # At the start, get the worker to fetch the first assignment. decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()), ACTIVITY1, VERSION, task_list='a_tasks') elif last_event_type == 'ActivityTaskCompleted': # Take decision based on the name of activity that has just completed. # 1) Get activity's event id. last_event_attrs = last_event['activityTaskCompletedEventAtsortingbutes'] completed_activity_id = last_event_attrs['scheduledEventId'] - 1 # 2) Extract its name. activity_data = history['events'][completed_activity_id] activity_attrs = activity_data['activityTaskScheduledEventAtsortingbutes'] activity_name = activity_attrs['activityType']['name'] # 3) Optionally, get the result from the activity. result = last_event['activityTaskCompletedEventAtsortingbutes'].get('result') # Take the decision. if activity_name == ACTIVITY1: # Completed ACTIVITY1 just came in. Kick off ACTIVITY2. decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()), ACTIVITY2, VERSION, task_list='b_tasks', input=result) elif activity_name == ACTIVITY2: # Server B completed activity. We're done. decisions.complete_workflow_execution() self.complete(decisions=decisions) return True 

Les travailleurs sont beaucoup plus simples, vous n’avez pas besoin d’utiliser l’inheritance si vous ne le souhaitez pas.

 # ab_worker.py import os import time import boto.swf.layer2 as swf DOMAIN = 'stackoverflow' ACTIVITY1 = 'ServerAActivity' ACTIVITY2 = 'ServerBActivity' VERSION = '1.0' class MyBaseWorker(swf.ActivityWorker): domain = DOMAIN version = VERSION task_list = None def run(self): activity_task = self.poll() print activity_task if 'activityId' in activity_task: # Get input. # Get the method for the requested activity. try: self.activity(activity_task.get('input')) except Exception, error: self.fail(reason=str(error)) raise error return True def activity(self, activity_input): raise NotImplementedError class WorkerA(MyBaseWorker): task_list = 'a_tasks' def activity(self, activity_input): result = str(time.time()) print 'worker a reporting time: %s' % result self.complete(result=result) class WorkerB(MyBaseWorker): task_list = 'b_tasks' def activity(self, activity_input): result = str(os.getpid()) print 'worker b returning pid: %s' % result self.complete(result=result) 

3) Exécutez vos décideurs et vos travailleurs. Votre décideur et vos employés peuvent être exécutés à partir d’hôtes distincts ou d’un seul et même ordinateur. Ouvrez quatre terminaux et exécutez vos acteurs:

D’abord votre décideur

 $ python -i ab_decider.py >>> while ABDecider().run(): pass ... 

Ensuite, le travailleur A, vous pouvez le faire à partir du serveur A:

 $ python -i ab_workers.py >>> while WorkerA().run(): pass 

Ensuite, le travailleur B, peut-être du serveur B, mais si vous les utilisez tous depuis un ordinateur portable, cela fonctionnera tout aussi bien:

 $ python -i ab_workers.py >>> while WorkerB().run(): pass ... 

4) Enfin, lancez le workflow.

 $ python Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) [GCC 4.4.3] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import boto.swf.layer2 as swf >>> workflows = swf.Domain(name='stackoverflow').workflows() >>> workflows [] >>> execution = workflows[0].start(task_list='default_tasks') >>> 

Revenez en arrière pour voir ce qui se passe avec vos acteurs. Ils peuvent se déconnecter du service après une minute d’inactivité. Si cela se produit, appuyez sur la flèche vers le haut + entrée pour ré-entrer dans la boucle d’interrogation.

Vous pouvez maintenant accéder au panneau SWF de votre console de gestion AWS, vérifier le déroulement des exécutions et consulter leur historique. Sinon, vous pouvez le demander via la ligne de commande.

 >>> execution.history() [{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 'workflowExecutionStartedEventAtsortingbutes': {'taskList': {'name': 'default_tasks'}, 'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': '1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAtsortingbutes': {'startToCloseTimeout': '300', 'taskList': {'name': ... 

Ceci est juste un exemple de workflow avec exécution en série des activités, mais il est également possible pour le décideur de planifier et de coordonner l’exécution parallèle des activités .

J’espère que cela vous permettra au moins de démarrer. Pour un exemple légèrement plus complexe d’un workflow en série, je vous recommande de regarder ceci .

Je n’ai aucun exemple de code à partager, mais vous pouvez certainement utiliser SWF pour coordonner l’exécution des scripts sur deux serveurs. L’idée principale est de créer trois morceaux de code qui parlent à SWF:

  • Un composant qui sait quel script exécuter en premier et que faire une fois le premier script exécuté. Cela s’appelle le “décideur” en termes de SWF.
  • Deux composants comprennent chacun comment exécuter le script spécifique que vous souhaitez exécuter sur chaque machine. Ceux-ci sont appelés “travailleurs de l’activité” en termes de SWF.

Le premier composant, le décideur, appelle deux API SWF: PollForDecisionTask et RespondDecisionTaskCompleted. La requête de sondage donnera à la composante de décision l’historique actuel d’un stream de travail en cours d’exécution, essentiellement les informations d’état «où suis-je» pour votre exécuteur de script. Vous écrivez du code qui examine ces événements et déterminez quel script doit être exécuté. Ces “commandes” pour exécuter un script seraient sous la forme d’une planification d’une tâche d’activité, qui est renvoyée dans le cadre de l’appel à RespondDecisionTaskCompleted.

Les deuxièmes composants que vous écrivez, les travailleurs de l’activité, appellent chacun deux API SWF: PollForActivityTask et RespondActivityTaskCompleted. La demande de sondage indique au travailleur de l’activité qu’il doit exécuter le script qu’il connaît, ce que SWF appelle une tâche d’activité. Les informations renvoyées par la demande d’interrogation au fichier SWF peuvent inclure des données spécifiques à l’exécution qui ont été envoyées à SWF dans le cadre de la planification de la tâche d’activité. Chacun de vos serveurs interrogerait indépendamment SWF pour les tâches d’activité pour indiquer l’exécution du script local sur cet hôte. Une fois que l’agent a exécuté le script, il renvoie à SWF via l’API RespondActivityTaskCompleted.

Le rappel de votre activité à SWF entraîne la transmission d’un nouvel historique au composant de décision que j’ai déjà mentionné. Il examinera l’historique, verra que le premier script est terminé et planifie l’exécution du second. Une fois qu’il a vu que le second est terminé, il peut “fermer” le workflow en utilisant un autre type de décision.

Vous lancez tout le processus d’exécution des scripts sur chaque hôte en appelant l’API StartWorkflowExecution. Cela crée l’enregistrement du processus global dans SWF et lance le premier historique dans le processus de décision pour planifier l’exécution du premier script sur le premier hôte.

J’espère que cela donne un peu plus de contexte sur la façon d’accomplir ce type de stream de travail en utilisant SWF. Si vous ne l’avez pas déjà fait, je jetterais un coup d’oeil au guide de développement sur la page SWF pour plus d’informations.

Vous pouvez utiliser SNS, lorsque le script A est terminé, il doit déclencher SNS, ce qui déclenche une notification au serveur B

bon exemple,

De plus, si vous ne souhaitez pas exporter vos informations d’identification vers l’environnement, vous pouvez appeler dans vos classes:

 swf.set_default_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)