понедельник, 30 января 2012 г.

sqlalchemy

Sqlalchemy - фреймворк для работы с базами данных с технолгией ORM. Почитать про него можно тут. Моей задачей было понять, как использовать этот фреймворк в потоках.


import sqlalchemy
from sqlalchemy.orm import sessionmaker, scoped_session

connection = "mysql://root:@localhost/mydatabase"
# pool_size - указывает сколько запросов хранить в очереди, если будет больше, вылетит exception
engine = sqlalchemy.create_engine(connection, pool_size=30)
# Создаем объект Session и дальше во всех потоках используем только его.
Session = scoped_session(sessionmaker(bind=engine))

#Accounts - моя таблица, к которой мне нужно делать запросы, код по настройке отображения я опустил.
accounts = Session.query(Accounts)
account = accounts[0]

Один интересный момент, account, созданный в одном потоке и переданный в другой, изменить не получится.
Если мы поменяем атрибуты account в новом потоке и выполним Session.commit() - таблица не обновится.

суббота, 14 января 2012 г.

Threading и KeyboardInterrupt.

Как организовать пул потоков? Допустим у меня есть 100 данных (пусть они находятся в каком-либо списке) и все их нужно переработать в 10 потоков. Стартуем 10 потоков, в каждый передаем по 1 данному, следим за потоками, как только один отработал, стартуем новый и так пока не закончатся данные... В общем, это не то, чтобы бред, но есть способ поудобней.

За всё время работы скрипта, стартует только 10 потоков, просто каждый из них, отработав со своим 1 данным, не дохнет, а берет из списка следующее 1 данное и делает с ним эту же работу:


# -*- coding: utf-8 -*-
import threading, Queue, time
import traceback



class Worker(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.__queue = queue

def run(self):
while True:

try: item = self.__queue.get_nowait() # ждём данные
except Queue.Empty: break # данные закончились, прекращаем работу

try: self.work(item) # работа
except Exception: traceback.print_exc()

time.sleep(0.5)
self.__queue.task_done() # задача завершена
return
def work(self,item):
print item
#pass


def main():
# Выводим в 5 потоков цифры от 1 до 100.
queue = Queue.Queue()
num_threads = 5 # 5 потоков

for x in xrange(100):
queue.put(x) # заносим данные в очередь
for i in xrange(num_threads):
t = Worker(queue) # создаем поток
t.start() # стартуем
time.sleep(0.1) # чтобы в консоли друг на друга не накладывались

queue.join() #блокируем выполнение программы, пока не будут израсходованы данные.

print "Done!"
if __name__ == '__main__':
main()


И, в общем, хотелось бы еще, чтобы можно всё это в любой момент можно было бы остановить. Но поскольку мы выполнение программы блокируем с помощью queue.join(), то скрипт наши Ctr+C банально "не услышит". Я начал искать, что делать в таком случае, нашел вот такое решение, но у меня оно работало абы как. А потом, поразмыслив, я сам придумал решение, которое оказалось ужасно простым.


# -*- coding: utf-8 -*-
import threading, Queue, time
import traceback



class Worker(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.__queue = queue

self.kill_received = False # флаг прекращения работы
def run(self):
while not self.kill_received:

try: item = self.__queue.get_nowait() # ждём данные
except Queue.Empty: break

try: self.work(item)
except Exception: traceback.print_exc()

time.sleep(0.5)
self.__queue.task_done() # задача завершена
self.__queue.put(item) # зациклим
return
def work(self,item):
print item


def main():
queue = Queue.Queue()
num_threads = 5 # 5 потоков

threads = []
for x in xrange(100):
queue.put(x) # заносим данные в очередь
for i in xrange(num_threads):
t = Worker(queue) # создаем нить
threads.append(t)
t.start() # стартуем
time.sleep(0.1)


#Пока в "живых" не останется только главный поток, ждем.
while threading.activeCount()>1:
try:
time.sleep(1)
except KeyboardInterrupt:
print "Ctrl-c received! Sending kill to threads..."
for t in threads:
t.kill_received = True # даем сигнал о завершении всем потокам
print "Done!"
if __name__ == '__main__':
main()