Le script Python utilise la boucle while pour continuer à mettre à jour les scripts de travail et à traiter plusieurs fois les tâches dans la queue

J’essaie d’écrire un script python analysant un dossier et de collecter les scripts SQL mis à jour, puis d’extraire automatiquement les données du script SQL. Dans le code, une boucle while parsing le nouveau fichier SQL et envoie à la fonction d’extraction de données. J’ai du mal à comprendre comment faire une file dynamic avec la boucle while, mais j’ai aussi un multi-processus pour exécuter les tâches dans la queue.

Le code suivant a un problème: l’itération de la boucle while travaillera sur un travail long avant de passer à la prochaine itération et collectera les autres travaux pour remplir le processeur vacant.

Mettre à jour:

  1. Merci à @pbacterio pour attraper le bogue, et maintenant le message d’erreur est parti. Après avoir modifié le code, le code python peut prendre tous les scripts de travail pendant une itération et dissortingbuer les scripts à quatre processeurs. Cependant, il sera bloqué par un long travail pour passer à la prochaine itération, en analysant et en soumettant les scripts de travail nouvellement ajoutés. Une idée de comment reconstruire le code?

  2. J’ai finalement compris la solution, voir la réponse ci-dessous. Il s’est avéré que ce que je cherchais est

    the_queue = Queue ()
    the_pool = Pool (4, worker_main, (the_queue,))

  3. Pour ceux qui ont trébuché sur cette idée similaire, voici l’ensemble de l’architecture de ce script d’automatisation qui convertit un lecteur partagé en un «serveur pour extraction SQL» ou tout autre «serveur» de file d’attente.

    une. Le script python auto_data_pull.py comme indiqué dans la réponse. Vous devez append votre propre fonction.

    b. Un ‘script de traitement par lots’ avec les éléments suivants:

    démarrer C: \ Anaconda2 \ python.exe C: \ Users \ bin \ auto_data_pull.py

    c. Ajoutez une tâche déclenchée par le démarrage de l’ordinateur, exécutez le script de traitement par lots. Ça marche.

Code Python:

 from glob import glob import os, time import sys import CSV import re import subprocess import pandas as PD import pypyodbc from multiprocessing import Process, Queue, current_process, freeze_support # # Function run by worker processes # def worker(input, output): for func, args in iter(input.get, 'STOP'): result = compute(func, args) output.put(result) # # Function used to compute result # def compute(func, args): result = func(args) return '%s says that %s%s = %s' % \ (current_process().name, func.__name__, args, result) def query_sql(sql_file): #test func #jsl file processing and SQL querying, data table will be saved to csv. fo_name = os.path.splitext(sql_file)[0] + '.csv' fo = open(fo_name, 'w') print sql_file fo.write("sql_file {0} is done\n".format(sql_file)) return "Query is done for \n".format(sql_file) def check_files(path): """ arguments -- root path to monitor returns -- dictionary of {file: timestamp, ...} """ sql_query_dirs = glob(path + "/*/IDABox/") files_dict = {} for sql_query_dir in sql_query_dirs: for root, dirs, filenames in os.walk(sql_query_dir): [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for filename in filenames if filename.endswith('.jsl')] return files_dict ##### working in single thread def single_thread(): path = "Y:/" before = check_files(path) sql_queue = [] while True: time.sleep(3) after = check_files(path) added = [f for f in after if not f in before] deleted = [f for f in before if not f in after] overlapped = list(set(list(after)) & set(list(before))) updated = [f for f in overlapped if before[f] < after[f]] before = after sql_queue = added + updated # print sql_queue for sql_file in sql_queue: try: query_sql(sql_file) except: pass ##### not working in queue def multiple_thread(): NUMBER_OF_PROCESSES = 4 path = "Y:/" sql_queue = [] before = check_files(path) # get the current dictionary of sql_files task_queue = Queue() done_queue = Queue() while True: #while loop to check the changes of the files time.sleep(5) after = check_files(path) added = [f for f in after if not f in before] deleted = [f for f in before if not f in after] overlapped = list(set(list(after)) & set(list(before))) updated = [f for f in overlapped if before[f] < after[f]] before = after sql_queue = added + updated TASKS = [(query_sql, sql_file) for sql_file in sql_queue] # Create queues #submit task for task in TASKS: task_queue.put(task) for i in range(NUMBER_OF_PROCESSES): p = Process(target=worker, args=(task_queue, done_queue)).start() # try: # p = Process(target=worker, args=(task_queue)) # p.start() # except: # pass # Get and print results print 'Unordered results:' for i in range(len(TASKS)): print '\t', done_queue.get() # Tell child processes to stop for i in range(NUMBER_OF_PROCESSES): task_queue.put('STOP') # single_thread() if __name__ == '__main__': # freeze_support() multiple_thread() 

Référence:

  1. surveiller les modifications de fichiers avec un script python: http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html
  2. Multiprocessing:
    https://docs.python.org/2/library/multiprocessing.html

    Où avez-vous défini sql_file dans multiple_thread() dans

     multiprocessing.Process(target=query_sql, args=(sql_file)).start() 

    Vous n’avez pas défini sql_file dans la méthode et vous avez en outre utilisé cette variable dans une boucle for. La scope de la variable est uniquement limitée à la boucle for.

    Essayez de remplacer ceci:

     result = func(*args) 

    par ça:

     result = func(args) 

    J’ai compris cela. Merci pour la réponse qui a inspiré la pensée. Maintenant, le script peut exécuter une boucle while pour surveiller le dossier du nouveau script SQL mis à jour / ajouté, puis dissortingbuer les données en les tirant sur plusieurs threads. La solution provient des fichiers queue.get () et queue.put (). Je suppose que l’object de queue s’occupe de la communication par lui-même.

    Ceci est le code final –

     from glob import glob import os, time import sys import pypyodbc from multiprocessing import Process, Queue, Event, Pool, current_process, freeze_support def query_sql(sql_file): #test func #jsl file processing and SQL querying, data table will be saved to csv. fo_name = os.path.splitext(sql_file)[0] + '.csv' fo = open(fo_name, 'w') print sql_file fo.write("sql_file {0} is done\n".format(sql_file)) return "Query is done for \n".format(sql_file) def check_files(path): """ arguments -- root path to monitor returns -- dictionary of {file: timestamp, ...} """ sql_query_dirs = glob(path + "/*/IDABox/") files_dict = {} try: for sql_query_dir in sql_query_dirs: for root, dirs, filenames in os.walk(sql_query_dir): [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for filename in filenames if filename.endswith('.jsl')] except: pass return files_dict def worker_main(queue): print os.getpid(),"working" while True: item = queue.get(True) query_sql(item) def main(): the_queue = Queue() the_pool = Pool(4, worker_main,(the_queue,)) path = "Y:/" before = check_files(path) # get the current dictionary of sql_files while True: #while loop to check the changes of the files time.sleep(5) sql_queue = [] after = check_files(path) added = [f for f in after if not f in before] deleted = [f for f in before if not f in after] overlapped = list(set(list(after)) & set(list(before))) updated = [f for f in overlapped if before[f] < after[f]] before = after sql_queue = added + updated if sql_queue: for jsl_file in sql_queue: try: the_queue.put(jsl_file) except: print "{0} failed with error {1}. \n".format(jsl_file, str(sys.exc_info()[0])) pass else: pass if __name__ == "__main__": main()