2012-08-16 8 views
14

Estoy escribiendo una aplicación que agrega líneas al mismo archivo de varios subprocesos.python: se agrega al mismo archivo desde varios subprocesos

Tengo un problema en el que algunas líneas se anexan sin una nueva línea.

¿Alguna solución para esto?

class PathThread(threading.Thread): 
    def __init__(self, queue): 
     threading.Thread.__init__(self) 
     self.queue = queue 

    def printfiles(self, p): 
     for path, dirs, files in os.walk(p): 
      for f in files: 
       print(f, file=output) 

    def run(self): 
     while True: 
      path = self.queue.get() 
      self.printfiles(path) 
      self.queue.task_done() 


pathqueue = Queue.Queue() 
paths = getThisFromSomeWhere() 

output = codecs.open('file', 'a') 

# spawn threads 
for i in range(0, 5): 
    t = PathThread(pathqueue) 
    t.setDaemon(True) 
    t.start() 

# add paths to queue 
for path in paths: 
    pathqueue.put(path) 

# wait for queue to get empty 
pathqueue.join() 
+3

Publicar un cierto código, eso ayudaría. –

+2

anexe una nueva línea. – Kuf

+1

Suena como * impossibru *. – plaes

Respuesta

22

La solución es escribir en el archivo en un único subproceso.

import Queue # or queue in Python 3 
import threading 

class PrintThread(threading.Thread): 
    def __init__(self, queue): 
     threading.Thread.__init__(self) 
     self.queue = queue 

    def printfiles(self, p): 
     for path, dirs, files in os.walk(p): 
      for f in files: 
       print(f, file=output) 

    def run(self): 
     while True: 
      result = self.queue.get() 
      self.printfiles(result) 
      self.queue.task_done() 

class ProcessThread(threading.Thread): 
    def __init__(self, in_queue, out_queue): 
     threading.Thread.__init__(self) 
     self.in_queue = in_queue 
     self.out_queue = out_queue 

    def run(self): 
     while True: 
      path = self.in_queue.get() 
      result = self.process(path) 
      self.out_queue.put(result) 
      self.in_queue.task_done() 

    def process(self, path): 
     # Do the processing job here 

pathqueue = Queue.Queue() 
resultqueue = Queue.Queue() 
paths = getThisFromSomeWhere() 

output = codecs.open('file', 'a') 

# spawn threads to process 
for i in range(0, 5): 
    t = ProcessThread(pathqueue, resultqueue) 
    t.setDaemon(True) 
    t.start() 

# spawn threads to print 
t = PrintThread(resultqueue) 
t.setDaemon(True) 
t.start() 

# add paths to queue 
for path in paths: 
    pathqueue.put(path) 

# wait for queue to get empty 
pathqueue.join() 
resultqueue.join() 
+0

en ProcessThread, the line - result = self.process (path) ? no hay método de proceso de colmena() allí .. – user1251654

+0

Se supone que debe definir el método de proceso para hacer lo que desee. Solo modifico el código para aclarar esto. – Dikei

+0

bien, mi mal. Gracias. este ayuda mucho – user1251654

0

Y tal vez algunas nuevas líneas donde no deberían estar? debe tener en cuenta el hecho de que no se debe acceder a un recurso compartido por más de un hilo a la vez o de lo contrario podrían ocurrir consecuencias impredecibles. (se llama usando 'operaciones atómicas' al usar hilos) Eche un vistazo a esta página para tener un poco de intuición.
Thread-Synchronization

1

el hecho de que nunca se ve el texto confuso en la misma línea o líneas nuevas en el medio de una línea es un indicio de que en realidad no necesita sincronizar anexar al archivo. el problema es que usa imprimir para escribir en un único manejador de archivo. sospecho que print está haciendo 2 operaciones para el manejador de archivo en una llamada y esas operaciones están corriendo entre los hilos. básicamente print está haciendo algo como:

file_handle.write('whatever_text_you_pass_it') 
file_handle.write(os.linesep) 

y debido a diferentes hilos están haciendo esto de forma simultánea en el mismo identificador de archivo a veces un hilo se pondrá en la primera escritura y el otro hilo a continuación, ponerse en su primera escritura y entonces Obtendré dos devoluciones de carro seguidas. o realmente cualquier permutación de estos.

la forma más sencilla de evitar esto es dejar de usar print y simplemente usar write directamente. intente algo como esto:

output.write(f + os.linesep) 

esto todavía me parece peligroso. No estoy seguro de qué garantías puede esperar con todos los subprocesos utilizando el mismo objeto de manejo de archivos y compitiendo por su búfer interno. Personalmente, el paso lateral del problema es todo y simplemente haga que cada hilo obtenga su propio identificador de archivo. También tenga en cuenta que esto funciona porque el valor predeterminado para las descargas de búfer de escritura es el búfer de línea, por lo que cuando finaliza en el archivo termina en os.linesep. para forzarlo a utilizar buffer de línea envíe un 1 como el tercer argumento de open. puedes probarlo así:

#!/usr/bin/env python 
import os 
import sys 
import threading 

def hello(file_name, message, count): 
    with open(file_name, 'a', 1) as f: 
    for i in range(0, count): 
     f.write(message + os.linesep) 

if __name__ == '__main__': 
    #start a file 
    with open('some.txt', 'w') as f: 
    f.write('this is the beginning' + os.linesep) 
    #make 10 threads write a million lines to the same file at the same time 
    threads = [] 
    for i in range(0, 10): 
    threads.append(threading.Thread(target=hello, args=('some.txt', 'hey im thread %d' % i, 1000000))) 
    threads[-1].start() 
    for t in threads: 
    t.join() 
    #check what the heck the file had 
    uniq_lines = set() 
    with open('some.txt', 'r') as f: 
    for l in f: 
     uniq_lines.add(l) 
    for u in uniq_lines: 
    sys.stdout.write(u) 

La salida tiene el siguiente aspecto:

hey im thread 6 
hey im thread 7 
hey im thread 9 
hey im thread 8 
hey im thread 3 
this is the beginning 
hey im thread 5 
hey im thread 4 
hey im thread 1 
hey im thread 0 
hey im thread 2 
Cuestiones relacionadas