[并发编程专题]多线程
线程是执行线程的缩写。程序员可以将他的工作拆分到线程中,这些线程同时运行并共享同一内存上下文。
多线程应该在多处理器或者多核机器上执行,将每个CPU核上并行化每个线程执行,从而使程序更快。
竞争冒险:
如果两个线程更新相同的没有任何保护的数据,则会发生竟态条件。
而锁机制有助于保护数据,在多线程编程中总要确保以安全的方式访问资源。
死锁:
两个线程锁一个资源,并尝试获取另外一个线程锁定的资源,他们讲永远彼此等待的情况叫死锁。
python多线程
多线程应用案例:假设需要使用多个查询从一些web服务器获取数据,这些查询无法通过单个大型的HTTP请求批量处理
使用API的地理编码端点解析。
单线程:
import time
api = { "Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
PLACES = (
'Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice',
'NewYork'
)
def fetch_place(place):
gecode = api[place]
print("place:",place,"gecode:",gecode)
def main():
for place in PLACES:
fetch_place(place)
if __name__ == '__main__':
main()
多线程:加入线程池。
启动一些预定义数量的线程,从队列中消费工作项,直到完成。当没有其他工作要做,线程将返回。
import time
from queue import Queue,Empty
from threading import Thread
THREAD_POOL_SIZE=4
def worker(work_queue):
while not work_queue.empty():
try:
item = work_queue.get(block=False)
except Empty:
break
else:
fetch_place(item)
work_queue.task_done()
api = { "Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
PLACES = (
'Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice',
'NewYork'
)
def fetch_place(place):
gecode = api[place]
print("place:",place,"gecode:",gecode,"\n")
def main():
work_queue = Queue()
for place in PLACES:
work_queue.put(place)
threads = [
Thread(target=worker,args=(work_queue,))
for _ in range(THREAD_POOL_SIZE)
]
for thread in threads:
thread.start()
work_queue.join()
while threads:
threads.pop().join()
if __name__ == '__main__':
main()
从打印结果可以看出输出有点问题,更好的方案是启动另外的线程打印,而不是在主线程中进行。我们可以通过一种队列来实现,这个队列主要负责从我们的工作线程收集结果。
import time
from queue import Queue,Empty
from threading import Thread
THREAD_POOL_SIZE=4
def worker(work_queue,result_queue):
while not work_queue.empty():
try:
item = work_queue.get(block=False)
except Empty:
break
else:
result_queue.put(
fetch_place(item)
)
work_queue.task_done()
api = { "Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
PLACES = (
'Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice',
'NewYork'
)
def fetch_place(place):
geo_dict = []
gecode = api[place]
geo_dict.append((place,gecode))
return geo_dict
def present_result(geocode):
print(geocode)
def main():
work_queue = Queue()
result_queue = Queue()
for place in PLACES:
work_queue.put(place)
threads = [
Thread(target=worker,args=(work_queue,result_queue))
for _ in range(THREAD_POOL_SIZE)
]
for thread in threads:
thread.start()
work_queue.join()
while threads:
threads.pop().join()
while not result_queue.empty():
present_result(result_queue.get())
if __name__ == '__main__':
main()
处理错误与速率限制
有时候会遇到外部服务器提供商施加的速率限制。当客户端超过请求速率会抛出异常,但是这个异常是单独引起的,不会导致整个程序崩溃,工作线程会立即退出,但是主线程将等待work_queue上存储的所有任务完成(使用work_queue.join()调用)。我们的线程应该尽可能的处理异常,并确保队列中的所有项目都会被处理。如果不做进一步处理改进,以上的案例一旦遇到异常,一些工作线程将崩溃,并且不会退出。
针对以上问题,可以在result_queue队列中设置一个错误实例,并将当前任务标记已完成,与没有错误时一样。这样,我们确保主线程在work_queue.join()中等待时间不会无限期地锁定。主线程然后可以检查结果并重新提出在结果队列中发现的任何异常。
import time
from queue import Queue,Empty
from threading import Thread
THREAD_POOL_SIZE=4
def worker(work_queue,results_queue):
while True:
try:
item = work_queue.get(block=False)
except Empty:
break
else:
try:
result = fetch_place(item)
except Exception as err:
results_queue.put(err)
else:
results_queue.put(result)
finally:
work_queue.task_done()
api = { "Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
PLACES = (
'Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice',
'NewYork'
)
def fetch_place(place):
geo_dict = []
gecode = api[place]
geo_dict.append((place,gecode))
return geo_dict
def present_result(geocode):
print(geocode)
def main():
work_queue = Queue()
results_queue = Queue()
for place in PLACES:
work_queue.put(place)
threads = [
Thread(target=worker,args=(work_queue,results_queue))
for _ in range(THREAD_POOL_SIZE)
]
for thread in threads:
thread.start()
work_queue.join()
while threads:
threads.pop().join()
while not results_queue.empty():
result = results_queue.get()
if isinstance(result,Exception):
raise result
present_result(result)
if __name__ == '__main__':
main()
对工作速度的限制成为节流。
令牌桶算法:
- 存在具有预定量的令牌桶
- 每个令牌响应单个权限以处理一项工作
每次工作者要求一个或者多个令牌权限时
– 我们测量从上次重新装满桶所花费的shijan
– 如果时间差允许,用这个时间差响应的令牌量重新填充桶
– 如果存储的令牌的数量大于或者等于请求的数量,我们减少存储的令牌的数量并且返回那个值;
– 如果存储的令牌的数量小于请求的数量,我们返回零import time
from queue import Queue,Empty
from threading import Thread
from threading import Lock
THREAD_POOL_SIZE=4class Throttle(object):
def __init__(self,rate):
self._consume_lock = Lock()
self.rate = rate
self.tokens = 0
self.last = 0
def consume(self,amount = 1):
with self._consume_lock:
now = time.time()
# 时间测量在第一个令牌请求上初始化以避免初始突变
if self.last == 0:
self.last = now
elapsed = now - self.last
# 请确认传递时间的量足够大一添加新的令牌
if int(elapsed*self.rate):
self.tokens+=int(elapsed*self.rate)
self.last = now
# 不要过度装满桶
self.tokens = (
self.rate
if self.tokens>self.rate
else self.tokens
)
# 如果可用最终分派令牌
if self.tokens>=amount:
self.tokens-=amount
else:
amount=0
return amount
def worker(work_queue,results_queue,throttle):
while True:
try:
item = work_queue.get(block=False)
except Empty:
break
else:
while not throttle.consume():
pass
try:
result = fetch_place(item)
except Exception as err:
results_queue.put(err)
else:
results_queue.put(result)
finally:
work_queue.task_done()
api = { "Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
PLACES = (
'Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice',
'NewYork'
)
def fetch_place(place):
geo_dict = []
gecode = api[place]
geo_dict.append((place,gecode))
return geo_dict
def present_result(geocode):
print(geocode)
def main():
work_queue = Queue()
results_queue = Queue()
throttle = Throttle(10)
for place in PLACES:
work_queue.put(place)
threads = [
Thread(target=worker,args=(work_queue,results_queue,throttle))
for _ in range(THREAD_POOL_SIZE)
]
for thread in threads:
thread.start()
work_queue.join()
while threads:
threads.pop().join()
while not results_queue.empty():
result = results_queue.get()
if isinstance(result,Exception):
raise result
present_result(result)
if __name__ == '__main__':
main()
还没有评论,来说两句吧...