Claro, puedes hacerlo con hacerlo con threads, puedes hacer tu
thread pool.
PD1:
Threads son una mentira en python.
PD2: Puedes usar multiprocessing y usar 100% todos los CPU que tengas
Aquí te dejo un ejemplo con threads:
Código Python:
Ver originalimport time
import threading
import logging
import Queue
import random
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
sentinel = None
queue = Queue.Queue()
num_threads = 5
def foobar_task(queue):
while True:
n = queue.get()
logger.info('task called: {n}'.format(n=n))
if n is sentinel: break
queue.task_done()
threads=[threading.Thread(target=foobar_task,args=(queue,))
for n in range(num_threads)]
for t in threads:
t.start()
while True:
n = random.random()
if n > .999:
break
else:
queue.put(n)
queue.join()
for i in range(num_threads):
queue.put(sentinel)
for t in threads:
t.join()
logger.info("threads are closed")
Y aquí te dejo otro usando multiprocessing:
Código Python:
Ver originalimport multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, lock, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.lock = lock
self.task_queue = task_queue
self.result_queue = result_queue
def process_safe_print(self, message):
self.lock.acquire()
print(message)
self.lock.release()
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
self.process_safe_print('%s: Exiting' % proc_name)
self.task_queue.task_done()
break
self.process_safe_print('%s: %s' % (proc_name, next_task))
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
lock = multiprocessing.Lock()
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print('Creating %d consumers' % num_consumers)
consumers = [ Consumer(lock, tasks, results)
for i in xrange(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in xrange(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in xrange(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
for w in consumers:
w.join()
# Start printing results
while num_jobs:
result = results.get()
print('Result: %s' % result)
num_jobs -= 1