张家港快速网站建设,wordpress信息填写,跳转网站代码,口碑好的无锡网站建设文章目录一、简介1.1 多线程的特性1.2 GIL二、线程1.2 单线程1.3 多线程三、线程池3.1 pool.submit3.2 pool.map四、Lock#xff08;线程锁#xff09;4.1 无锁导致的线程资源异常4.2 有锁五、Event#xff08;事件#xff09;5.1 简介5.2 示例六、Queue#xff08;队列线程锁4.1 无锁导致的线程资源异常4.2 有锁五、Event事件5.1 简介5.2 示例六、Queue队列6.1 简介6.2 生产者 消费者七、Condition条件锁7.1 简介7.2 notify 单任务通信7.3 notify_all 多任务通信八、Semaphore信号量8.1 简介8.2 示例一、简介
说起python线程说少也少比如线程怎么启动获取结果阻塞等还有线程池的两种运行方式以及使用的一些案例。说多的话又会涉及到LockRlockQueueConditionEvent等很多东西。
这边博客先留着慢慢写主要介绍线程和线程池的使用后面吧所有线程内部的东西都深入说一下为什么要用到这些。
1.1 多线程的特性
Python多线程是Python的一个重要特性它允许程序同时执行多个线程。Python中的线程是轻量级的它们共享内存空间因此创建和销毁线程的开销很小。
多线程切换的效率是以时间为指标的因为线程切换需要保存当前线程的状态并加载下一个线程的状态这个过程需要花费一定的时间。在多线程切换的过程中如果线程的数量过多那么线程切换的时间就会占用大量的CPU时间从而导致程序的执行效率降低。因此在编写多线程程序时需要合理的控制线程的数量避免线程切换的时间过长。
1.2 GIL
Python GIL Global Interpreter Lock是Python解释器的一个特性它是一种互斥锁用于保护Python解释器的内部数据结构。在任何时刻只有一个线程可以执行Python字节码。这意味着即使在多核CPU上运行Python程序也只能使用一个核。
这个特性对于CPU密集型任务来说是一个瓶颈因为它不能充分利用多核CPU的优势。但是对于I/O密集型任务来说Python GIL并不是一个问题因为在I/O操作期间Python解释器会释放GIL以便其他线程可以执行Python字节码。如果你想充分利用多核CPU可以使用多进程或者使用其他语言编写CPU密集型任务的代码。
二、线程
1.2 单线程
import threading
import requestsdef task(url):resp requests.get(urlurl).textprint(len(resp))thread threading.Thread(targettask, args(https://www.baidu.com, ))
thread.start() # 启动这个线程
thread.join() # 阻塞线程print(end)1.3 多线程
这里有个点记一下join是会阻塞任务的只有当线程全部都跑完才会向下执行如果你需要在执行线程的时候响应外部请求那么只start即可。
import threading
import requestsdef task(url):resp requests.get(urlurl).textprint(furl length: , len(resp))urls [fhttps://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{num}.png for num in range(1, 6)]
threads []
for url in urls:thread threading.Thread(targettask, args(url, ))thread.start()threads.append(thread)for thread in threads:thread.join()三、线程池
线程池主要是解决多任务并发的实效性问题简单的说就是让任务跑的很快系统资源利用率更高。线程池对于运行多个线程的优点主要就是线程的启动和关闭有资源的开销而线程池则可以复用线程。你就当作TCP中客户端TIME-WAIT状态的连接重新用于新的TCP连接一个道理。本质都是为了减少资源开销。
那为啥要用线程池而不用进程池区别就是
线程池中的线程启动开销更低切换也更快线程池主要是为了解决IO的问题文件IO网络IO等进程池主要是为了解决CPU密集的问题数据运算数据处理等
python中线程池的库是concurrent.futures线程池有两种使用方式submit和map
3.1 pool.submit
下面是我业务场景中要通过线程池启动两个线程并且要将返回的值拿过来对用户进行响应
一个是获取阿里云SLB的QPS 一个是获取阿里云SLB的RT
这个案例很好说明为什么我要用线程池中的 pool.submit 方法而不是 pool.map结论是都能实现submit更方便
我的任务很少不去太考虑线程的重用和开销问题明确知道需要两个指标一个是QPS一个是RTpool.submit的结果可以通过result()直接拿到pool.map 返回的是一个generator对象用起来会更麻烦
# submit获取结果
qps resp_qps.result()
rt resp_rt.result()# map获取结果
results pool.map(func, data)
for result in results:print(result)from concurrent.futures import ThreadPoolExecutor
pool ThreadPoolExecutor()def get_slb_metric(ali_obj, metric_name, delay, dimensions): 获取阿里云指标数据 period 60namespace acs_slb_dashboardtimestamp int(time.time())start_time timestamp_to_str(timestamptimestamp - delay - period)end_time timestamp_to_str(timestamptimestamp - delay)response ali_obj.describe_metric_data_request(namespacenamespace,metric_namemetric_name,start_timestart_time,end_timeend_time,periodperiod,dimensionsdimensions,)return json.loads(response[Datapoints])def get_slb_metric_data(ali_obj, delay, dimensions): 获取阿里云指标数据公共方法 resp_metric {success: False,msg: None,retry: 0,data: {qps: None,rt: None,}}while resp_metric[retry] 5:resp_metric[retry] 1resp_qps pool.submit(get_slb_metric, ali_obj, Qps, delay, dimensions)resp_rt pool.submit(get_slb_metric, ali_obj, Rt, delay, dimensions)qps resp_qps.result()rt resp_rt.result()if qps and rt:resp_metric[success] Trueresp_metric[data][qps] qps[0][Average]resp_metric[data][rt] rt[0][Average]return resp_metricdelay 10resp_metric[msg] f请求异常: \n\n qps或rt数据为空请延长周期最后一次delay为: {delay}s, 递增10s请求5次未成功获取到数据return resp_metric3.2 pool.map
那什么时候用pool.map以我的习惯是当这个任务是固定的在批量处理数据的情况下pool.map特别方便。
需要处理的任务非常多需要考虑线程开销且复用对于任务的执行结果不需要按类型来区分批量获取
那具体用在什么场景下呢下面是我一个业务场景ECI配置中心里面记录了一条条项目的配置信息且ECI项目的POD数量是动态的里面记录了获取POD的URL需要实时请求。eci_list的方法需要读取配置中心
这个时候我的任务数是不固定的有多少个业务需要ECI那就有多少个URL需要去请求我总不能一个一个手动去创建线程并且所有业务URL的接口给我返回的数据格式是固定的我也只会取固定的值那么pool.map就非常方便了。批量请求接口批量处理结果。
def get_pod_info(data):# 获取项目信息resp_data {success: False,msg: None,data: data}try:url data.get(url)headers {contene-type: application/json}project data.get(project)profile data.get(profile)sign data.get(sign)req_data {sign: sign,project: project,profile: profile}resp requests.post(urlurl, headersheaders, datajson.dumps(req_data))if resp.status_code 200:resp_data[data][pod] 100resp_data[data][open] Trueresp_data[success] Trueelse:resp_data[msg] fECI业务接口异常: {resp.text}\n\n地址: {url}\n\n项目: {project}\n\n标签: {profile}except Exception as err:resp_data[msg] fECI运维接口异常: {err}\n\n地址: {url}\n\n项目: {project}\n\n标签: {profile}finally:return resp_datadef eci_list():if request.method GET:resp_data {success: False,msg: None,data: None}try:# 获取项目列表queryset OpsEci.query.all()data list()for obj in queryset:_obj_host CloudHost.query.filter_by(resource_typekubernetes, instance_idobj.kubernetes).first()_kube_host re.search(\d\.\d\.\d\.\d, _obj_host.private_ip[0]).group()_data {project: obj.project,profile: obj.profile,kubeconfig: f/home/tomcat/.kube/kubeconfig/{_kube_host},token: obj.token,address: obj.address,url: obj.url,sign: obj.sign}data.append(_data)# 处理项目列表with ThreadPoolExecutor() as pool:results pool.map(get_pod_info, data)results_list list()for result in results:project result[data][project]profile result[data][profile]app project if profile jst else f{project}-{profile}if result.get(success):results_list.append({status: True,open: result[data][open],pod: result[data][pod],ding_token: result[data][token],app: app,kubeconfig_path: result[data][kubeconfig]})else:results_list.append({status: False,app: app,})resp_data[success] Trueresp_data[data] results_listexcept Exception as err:resp_data[msg] f获取ECI项目信息异常: {err}finally:return resp_data
四、Lock线程锁
4.1 无锁导致的线程资源异常
当线程没有锁以后不同的线程使用共享资源会出现不可预估的后果
我们期望的情况
王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余: 200
取钱失败: 余额: 200可能会出现的情况
王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余: 200
取钱成功, 剩余: -600无锁代码这里为了能稳定复现特别加了sleep
from concurrent.futures import ThreadPoolExecutor
import threading
import timepool ThreadPoolExecutor()def bank(amount):global balanceprint(f王二狗第{threading.current_thread().name}次取钱)if balance amount:time.sleep(0.1)balance balance - amountprint(取钱成功, 剩余: , balance)else:print(取钱失败: 余额: , balance)if __name__ __main__:balance 1000t1 threading.Thread(targetbank, args(800,), name1)t2 threading.Thread(targetbank, args(800,), name2)t1.start()t2.start()4.2 有锁
如果在使用线程共享资源的时候给资源加上锁那么我们每次运行的结果都是一致的
王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余: 200
取钱失败: 余额: 200有锁代码在操作线程共享资源的时候给资源上锁
import threading
import timepool ThreadPoolExecutor()
lock threading.Lock()def bank(amount):global balanceprint(f王二狗第{threading.current_thread().name}次取钱)with lock:if balance amount:time.sleep(0.1)balance balance - amountprint(取钱成功, 剩余: , balance)else:print(取钱失败: 余额: , balance)if __name__ __main__:balance 1000t1 threading.Thread(targetbank, args(800,), name1)t2 threading.Thread(targetbank, args(800,), name2)t1.start()t2.start()五、Event事件
5.1 简介
Event是python中的一个同步原语用于线程之间的通信。event有两种状态分别是set和clear。当event处于set状态时调用wait方法的线程会立即返回否则会一直阻塞直到event被set。
5.2 示例
下面例子中创建了一个event事件两个人worker。worker_b调用了event.wait()方法这会使worker线程阻塞直到event被set。当worker_a开始运行后将event置为set后worker_b结束阻塞开始运行。
import threading
import timeevent threading.Event()def worker_a():print(f{time.time()}: worker_a 等待运行)print(f{time.time()}: worker_a 开始运行)event.set()def worker_b():print(f{time.time()}: worker_b 等待运行)event.wait()print(f{time.time()}: worker_b 开始运行)if __name__ __main__:t_a threading.Thread(targetworker_a)t_b threading.Thread(targetworker_b)t_b.start()time.sleep(1)t_a.start()执行结果
1679301511.736171: worker_b 等待运行
1679301512.741683: worker_a 等待运行
1679301512.741767: worker_a 开始运行
1679301512.741956: worker_b 开始运行六、Queue队列
6.1 简介
Python中的Queue模块提供了同步的、线程安全的队列类包括FIFO先进先出)队列QueueLIFO后进先出队列LifoQueue和优先级队列PriorityQueue。这些队列都实现了锁原语能够在多线程中直接使用。
6.2 生产者 消费者
下面是一个使用Queue模块实现多线程的示例其中包括了生产者和消费者两个线程生产者向队列中添加元素消费者从队列中取出元素而他们都是用队列queue.Queue()除此之外还有queue.LifoQueuequeue.PriorityQueue。
import threading
import queue
import timeclass Producer(threading.Thread):def __init__(self, queue):threading.Thread.__init__(self)self.queue queuedef run(self):for i in range(100):self.queue.put(i)time.sleep(0.3)print(producer put end)class Consumer(threading.Thread):def __init__(self, queue):threading.Thread.__init__(self)self.queue queuedef run(self):while True:if self.queue.empty():time.sleep(1)print(queue is empty, waiting ...)else:print(fconsumer get {self.queue.get()})time.sleep(0.1)if __name__ __main__:q queue.Queue()producer Producer(q)consumer Consumer(q)producer.start()consumer.start()七、Condition条件锁
7.1 简介
在 Python 中可以使用 threading.Condition 实现条件锁。Condition 对象提供了 acquire() 和 release() 方法与 Lock 对象的方法类似。此外Condition 对象还提供了 wait()、notify() 和 notify_all() 方法用于线程间的协调。具体使用方法可以参考 Python 官方文档中的 threading.Condition 部分。
在 threading.Condition 中wait() 方法会释放锁并挂起当前线程直到另一个线程调用 notify() 或 notify_all() 方法唤醒它。notify() 方法会随机唤醒一个挂起的线程而 notify_all() 方法会唤醒所有挂起的线程。需要注意的是wait() 方法只能在已经获得锁的情况下调用否则会抛出 RuntimeError 异常。
在使用 threading.Condition 时通常需要先获得一个 Lock 对象然后使用这个 Lock 对象创建一个 Condition 对象。在需要等待某个条件时调用 Condition 对象的 wait() 方法在满足条件时调用 notify() 或 notify_all() 方法唤醒等待的线程
7.2 notify 单任务通信
import threading
import timecondition threading.Condition()class Master(threading.Thread):主任务类执行过后等待子任务响应def __init__(self, name, condition):super().__init__(namename)self.name nameself.cond conditiondef run(self):with self.cond:print(self.name, -----任务开始-----)print(self.name, 事件A处理完毕, 等待worker响应...)self.cond.notify()self.cond.wait()print(self.name, 事件B处理完毕, 等待worker响应...)self.cond.notify()self.cond.wait()print(self.name, 事件C处理完毕, 等待worker响应...)self.cond.notify()self.cond.wait()print(self.name, 事件D处理完毕, 等待worker响应...)self.cond.notify()self.cond.wait()print(self.name, -----任务结束-----)class Worker(threading.Thread):子任务类等待主任务通知并响应def __init__(self, name, condition):super().__init__(namename)self.name nameself.cond conditiondef run(self):with self.cond:self.cond.wait()print(self.name, 事件A已响应, 请继续)self.cond.notify()self.cond.wait()print(self.name, 事件B已响应, 请继续)self.cond.notify()self.cond.wait()print(self.name, 事件C已响应, 请继续)self.cond.notify()self.cond.wait()print(self.name, 事件D已响应, 请继续)self.cond.notify()if __name__ __main__:master Master(master, condition)worker Worker(worker, condition)worker.start()time.sleep(1)master.start()执行结果
master -----任务开始-----
master 事件A处理完毕, 等待worker响应...
worker 事件A已响应, 请继续
master 事件B处理完毕, 等待worker响应...
worker 事件B已响应, 请继续
master 事件C处理完毕, 等待worker响应...
worker 事件C已响应, 请继续
master 事件D处理完毕, 等待worker响应...
worker 事件D已响应, 请继续
master -----任务结束-----7.3 notify_all 多任务通信
import threading
import timecondition threading.Condition()class Master(threading.Thread):主任务类执行过后等待子任务响应def __init__(self, name, condition):super().__init__(namename)self.name nameself.cond conditiondef run(self):with self.cond:print(self.name, 前置准备工作结束, 通知子任务开始任务...)time.sleep(1)self.cond.notify_all()class Worker(threading.Thread):子任务类等待主任务通知并响应def __init__(self, name, condition):super().__init__(namename)self.name nameself.cond conditiondef run(self):with self.cond:print(self.name, 准备就绪, 等待调度...)self.cond.wait()print(self.name, 接收到主任务通知, 开始执行任务)print(self.name, 任务A执行完成)print(self.name, 任务B执行完成)print(self.name, 任务C执行完成)if __name__ __main__:master Master(master, condition)worker_a Worker(worker-a, condition)worker_b Worker(worker-b, condition)worker_c Worker(worker-c, condition)worker_a.start()worker_b.start()worker_c.start()time.sleep(0.3)master.start()worker-a 准备就绪, 等待调度...
worker-b 准备就绪, 等待调度...
worker-c 准备就绪, 等待调度...
master 前置准备工作结束, 通知子任务开始任务...
worker-a 接收到主任务通知, 开始执行任务
worker-a 任务A执行完成
worker-a 任务B执行完成
worker-a 任务C执行完成
worker-b 接收到主任务通知, 开始执行任务
worker-b 任务A执行完成
worker-b 任务B执行完成
worker-b 任务C执行完成
worker-c 接收到主任务通知, 开始执行任务
worker-c 任务A执行完成
worker-c 任务B执行完成
worker-c 任务C执行完成八、Semaphore信号量
8.1 简介
Semaphore用于控制对共享资源的访问。semaphore维护一个内部计数器。该计数器可以通过 acquire() 和 release() 两个方法来增加和减少。当计数器为0时acquire() 方法将会被阻塞直到其他线程调用 release() 方法位置。
semaphore.acquire() 将会使计数器-1当计数器为0则会阻塞当前线程 semaphore.release() 将会时计数器1以便有更多的资源去使用计数器
8.2 示例
下面模拟10个任务运行的情况同时运行三个线程通过Semaphore进行控制线程数。可以发现通过semaphore即可控制线程的worker
import threading
import timesemaphore threading.Semaphore(3)def task():任务机with semaphore:print(time.strftime(%H:%M:%S), threading.current_thread().name, 开始执行...)time.sleep(2)if __name__ __main__:for i in range(10):t threading.Thread(targettask)t.start()执行结果
18:12:05 Thread-1 开始执行...
18:12:05 Thread-2 开始执行...
18:12:05 Thread-3 开始执行...
18:12:07 Thread-4 开始执行...
18:12:07 Thread-5 开始执行...
18:12:07 Thread-6 开始执行...
18:12:09 Thread-7 开始执行...
18:12:09 Thread-9 开始执行...
18:12:09 Thread-8 开始执行...
18:12:11 Thread-10 开始执行...