大模型学习-python基础Day11
1.并发与并行并发单个cpu处理多个任务。各个任务交替执行一段时间并行多个cpu同时执行各个认为2.多进程进程进程是操作系统进行资源分配的基本单位。每个进程有自己独立的一块内存空间。多进程就是在操作系统中执行多个进程。使用multiprocessing.process创建进程multiprocessing.Process是 Python 中用于创建和管理进程的核心类。以下是一个完整的示例展示如何创建并启动一个子进程。multiprocessing.Process(group None, target none, name none, args (), kwargs {}, *, daemon none)import multiprocessing import os def worker(): print(f子进程 ID: {os.getpid()}, 父进程 ID: {os.getppid()}) if __name__ __main__: print(f主进程 ID: {os.getpid()}) p multiprocessing.Process(targetworker) p.start() p.join()关键参数说明target: 指定子进程要执行的函数。由run()方法来发起调用的可调用对象。group:保留参数始终设为None为未来扩展预留。当前版本未实现功能daemon:布尔值控制进程是否为守护进程。守护进程会随主进程退出而终止且不允许创建子进程。默认为none则继承父进程。args: 以元组形式传递参数给目标函数。kwargs: 以字典形式传递关键字参数。name: 为进程设置名称可通过p.name查看。Process的属性和方法name进程的名称可通过赋值修改。常用于标识进程用途pid返回进程的ID号类型为整数唯一标识一个进程。daemon布尔值表示是否为守护进程。主线程退出时守护进程会自动终止。exitcode进程退出时的状态码。运行中为None正常退出为0被信号终止为负数。authkey进程的身份验证密钥用于网络连接时的安全验证默认为os.urandom()生成。start()启动进程调用后系统会创建新的进程并执行run()方法。每个进程只能调用一次。run()默认调用传入target的对象定义进程执行的任务逻辑。通常通过继承Process类并重写此方法实现自定义功能。join(timeoutNone)阻塞主进程等待子进程结束。timeout参数指定超时时间秒超时后继续执行。terminate()强制终止子进程。可能产生资源未释放的问题建议优先使用正常结束逻辑。is_alive()返回布尔值检查进程是否仍在运行。可用于监控进程状态。close()释放进程资源。仅当未调用start()或进程已终止时可用。os.getpid():获取当前进程编号os.getppid()获取当前进程的父进程编号示例文件同时读写import multiprocessing import time #同时对读写文件进行操作 def write_file(): with open(text.txt,a) as f: while true: f.write(hello world\n) #手动将缓冲区数据刷写到文件 f.flush() time.sleep(0.5) def read_file(): with open(text.txt,r) as f: while true: time.sleep(0.5) print(f.readline()) if __name__ __main__: #让当前两函数同时执行 p1 multiprocessing.Process(target write_file) p2 multiprocessing.Process(target read_file) p1.start() p2.start()若直接调用multiprocessing相关函数而不加保护条件子进程在初始化时会再次执行模块级代码形成循环。if __name__ __main__确保代码仅在主进程中执行。自定义Process子类创建对象import os import multiprocessing class Worker(multiprocessing.Process): def fun(self): print(进程idos.getpid(),\t父进程idos.getppid()) if __name__ __main__: #创建多个进程对象 for i in range(5): p Worker() p.start()通过进程池创建进程对象multiprocessing.Pool( processesNone, initializerNone, initargs(), maxtasksperchildNone, context )processes类型整数或None作用指定进程池中的工作进程数量。默认值None表示使用os.cpu_count()返回的 CPU 核心数。initializer类型可调用对象如函数或None作用每个工作进程启动时执行该函数。默认值None表示不执行任何初始化操作。initargs类型元组作用传递给initializer函数的参数。默认值空元组()。maxtasksperchild类型整数或None作用每个工作进程完成指定数量的任务后会被重启避免内存泄漏。一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量。默认值None表示工作进程寿命与池对齐。context作用可被用于指定启动的工作进程的上下文。通常一个进程池是使用函数multiprocessing.Pool或者上一个对象的pool方法创建的注意⚠️进程池对象的方法只有创建它的进程能够调用使用时候一般只指定process参数进程池常用方法apply语法apply(func, args(), kwds{})作用同步调用函数阻塞直到返回结果。apply_async语法apply_async(func, args(), kwds{}, callbackNone, error_callbackNone)作用异步调用函数返回AsyncResult对象。map语法map(func, iterable, chunksizeNone)作用并行处理可迭代对象返回结果列表。map_async语法map_async(func, iterable, chunksizeNone, callbackNone, error_callbackNone)作用异步版本的map返回AsyncResult对象。close语法close()作用关闭进程池禁止提交新任务。terminate语法terminate()作用立即终止所有工作进程。join语法join()作用阻塞主进程等待所有工作进程结束需在close或terminate后调用。案例import os import time import multiprocessing def func(): for i in range(10) print(os.getpid(),i) time.sleep(0.5) if __name__ __main__: #指定进程池大小 process_num 5 pool multiprocessing.Pool(process_num) for p in range(process_num): #交任务给进程池 阻塞式p.apply(func) #非阻塞式 pool.apply_async(func)#异步 pool.close() pool.join()进程间通信进程间不共享全局变量子进程向传入的列表中添加元素最终发现主进程与子进程之间的列表结果不同import os def func(list1): for i in range(10): list1.append(i) print(os.getpid(),list1) if __name__ __main__: list1 [] p1 multiprocessing.Process(target func, args (list1,)) p2 multiprocessing.Process(target func, args (list1,)) p1.start() p2.start() p1.join() p2.join() print(os.getpid(),list1) print(f当前进程名{multiprocessing.current_process().name},list1)全局变量list1没有被p1和p2共享使用Queue通信——可以实现多个进程之间共享python的multiprocessing模块包装底层的机制提供了Queue、pipes等多种方式来交换数据。multiprocessing.Queue([maxsize])返回一个使用一个管道和少量锁和信号量实现的共享队列实例。当一个进程将一个对象放进队列中时一个写入线程会启动并将对象从缓冲区写入管道中。默认队列是无限大小的可以通过maxsize参数限制。Queue常用方法qsize返回队列大致长度。由于多线程或者多进程上下文这个数字是不可靠的。empty如果队列是空的返回True。由于多线程或者多进程上下文这个数字是不可靠的。full如果队列是满的返回Trueputobj[, block[, timeout[:将obj放入队列。blocktrue而且timeoutnone将会阻塞当前进程指导有空的缓冲槽。如果timeout是正数将会阻塞了最多timeout秒之后还是没有可用的缓冲槽时抛出queue.Full异常。反之block false仅当有可用缓冲槽时才放入对象否则抛出queue.Full异常put_nowait(obj):相当于putobjfalseget([block[, timeout]])):blocktrue而且timeoutnone将会阻塞当前进程直到队列出现可用对象。如果timeout是正数将会阻塞了最多timeout秒之后还是没有可用的对象时抛出queue.empty异常。反之block false仅当有可用对象能够取出时才放入对象否则抛出queue.empty异常get_nowait(obj):相当于getfalse案例import os import multiprocessing import random #放数据 def func1(queue): while true: queue.put(random.randint(1,50)) time.sleep(0.5) #取数据 def fun2(queue): while true: print(*queue.get()) if __name__ __main__: queue multiprocessing.Queue() p1 multiprocessing.Process(target func1,args (queue,)) p2 multiprocessing.Process(target func2,args (queue,)) p1.start() p2.start() p1.join() p2.join()使用进程池——要使用Manager.Queueif __name__ __main__: qu multiprocessing.Manager().Queue() pool multiprocessing.Pool(2) pool.apply_async(func1,(queue,)) pool.apply_async(func2,(queue,)) pool.close() pool.join()3.多线程线程是处理器任务调度和执行的基本单位一个进程至少有一个线程也可以运行多个线程多个线程之间可以共享数据线程运行出错异常后如果没有捕获会导致整个进程崩溃多线程是指在同一个进程中同时执行多个任务使用threading.Thread创建线程python提供了两个模块_thread和threading_thread是低级模块threading是高级模块对_thread的封装。绝大多数下我们只使用threading这个高级模块Thread的创建threading.Thread(group none, target none, name none, args (), kwargs {}, * , daemon none)group保留参数当前未使用默认值为None。为未来扩展预留通常无需指定。target指定线程要执行的函数或方法。必须是可调用对象如函数名默认None表示无操作。name设置线程名称便于调试和日志记录。若不指定Python自动生成Thread-N格式的名称N为数字。args以元组形式传递给target函数的位置参数。例如args(1, 2)会解包为target(1, 2)。kwargs以字典形式传递给target函数的关键字参数。例如kwargs{x: 1}等价于target(x1)。daemon布尔值控制线程是否为守护线程。若为True主线程退出时会强制结束该线程默认None继承当前线程的守护状态。thread的属性和方法与其他常用方法name线程名称用于标识线程。可通过构造函数或直接赋值设置。ident线程标识符。线程启动后由系统分配未启动时为None。daemon布尔值表示是否为守护线程。主线程退出时守护线程会自动终止。需在start()前设置。is_alive()返回线程是否正在运行。线程启动后且未终止时为True。start()启动线程调用run()方法。每个线程只能调用一次。run()定义线程执行逻辑。默认调用传递给构造函数的target函数。子类可重写此方法。join(timeoutNone)阻塞当前线程直到目标线程结束或超时。timeout为可选参数秒。threading.current_thread()返回当前线程对象。threading.enumerate()返回所有存活线程的列表包括主线程和守护线程。threading.active_count()返回当前存活的线程数量。案例两线程分别交替打印import time import threading #交替打印0000和1111 def func(): flag 0 while true print(threading.current_thread().name, f{flag}*4) flag flag^1 #替换1和0 time.sleep(0.5) if __name__ __main__: t1 threading.Thread(Target func, name 线程1) t2 threading.Thread(Target func, name 线程2) t1.start() t2.start()自定义Thread线程子类创建线程import threading class Worker(threading.Thread): def run(self): flag 0 while True: print(f线程{threading.current_thread().name},f{flag}*5) flag flag^1 time.sleep(0.6) if __name__ __main__: t1 Worker(name thread1) t2 Worker(name thread2) t1.start() t2.start()线程池ThreadPoolExecutor 是concurrent.futures模块中的线程池实现它允许我们轻松地提交任务到线程池并管理任务的执行和结果。线程池的创建concurrent.futures.ThreadPoolExecutor(max_workers none, thread_name_prefix ,initializer none, initargs ())max_workers指定线程池中最大工作线程数。默认值为None此时会根据系统CPU核心数自动设置通常为os.cpu_count() * 5。若显式设置为整数则严格限制线程数量。thread_name_prefix为线程池中的线程设置名称前缀。默认空字符串线程名称为ThreadPoolExecutor-{数字}。自定义前缀可帮助调试时区分不同线程池的线程。initializer接收一个可调用对象如函数在每个线程启动时执行初始化。默认None表示不进行初始化。适用于线程本地资源如数据库连接的预配置。initargs以元组形式传递initializer函数的参数。默认None表示无参数。需与initializer配合使用例如初始化时传递配置路径。线程池常用方法submit(Runnable task)提交任务并返回Future?可通过Future.get()等待任务完成。shutdown()平缓关闭线程池等待已提交任务完成不再接受新任务。shutdownNow()立即尝试终止所有任务返回未执行的任务列表。def func(tname): global word for i, char in enumerate(word): word[i] chr(ord(char)^1) print(f{tname} : {word}\n, end ) return word for i, char in enumerate(word): 遍历 word 的每个字符i 为索引char 为当前字符。 word[i] chr(ord(char)^1) 对每个字符执行以下操作 ord(char)获取字符的 Unicode 码点。 ^1对码点进行按位异或XOR运算参数 1 表示二进制最后一位取反。 chr(...)将运算后的码点转回字符。 将结果写回 word[i]直接修改原字符串的字符。 print(f{tname} : {word}\n, end) 格式化输出 tname函数参数用于标识输出来源。 word每次修改后的完整字符串。 end禁止换行确保多次调用时输出紧凑。 if __name__ __main__: word list(idmmn!vnsme) #使用with语句来确保线程被迅速处理 with concurrent.futures.ThreadPoolExecutor(max_worker 3) as executor: future1 executor.submit(fun,thread1) future2 executor.submit(fun,thread2) future3 executor.submit(fun,thread3) word future1.result() word future2.result() word future3.result() print(.join(word))代码执行逻辑初始化字符串将字符串idmmn!vnsme转换为列表word便于修改单个字符。线程池创建使用ThreadPoolExecutor创建最多3个线程的线程池。线程任务提交每个线程执行func函数传入线程名称如thread1。字符处理每个线程遍历word对每个字符执行异或运算并更新列表。结果合并主线程等待所有线程完成最终输出处理后的字符串。互斥锁线程安全问题线程之间共享数据会存在线程安全的问题举例多个线程同时递增一个共享计数器由于操作非原子性可能导致计数错误。import threading counter 0 def increment(): global counter for _ in range(100000): counter 1 threads [] for _ in range(10): t threading.Thread(targetincrement) threads.append(t) t.start() 创建10个线程每个线程执行increment函数。 将线程对象存入列表threads并立即启动线程。 for t in threads: t.join() print(fExpected: 1000000, Actual: {counter})互斥锁的概念互斥锁Mutex全称 Mutual Exclusion是一种同步机制用于控制多个线程或进程对共享资源的访问。其主要目的是防止并发访问导致的数据竞争或不一致问题。某个线程要更改共享数据时先将其锁定此时其他线程不能更改。知道该线程释放资源将资源的状态变成“非锁定”其他的线程才可以再次锁定该资源。互斥锁的使用可以通过threading.Lock()创建互斥锁使用lock.acquire([blocking True][,timeout 1])来获取锁blocking为true线程会阻塞知道获取到锁如果为false线程立即返回。获取锁成功返回True否则返回false。timeout为等待时间超时未获取锁则返回false使用lock.release()释放锁def func(): global g_num for _ in range(10): lock.acquire() g_num1 time.sleep(0.2) print(f当前线程{threading.current_thread().name}, g_num) lock.release() if __name__ __main__: g_num 0 lock threading.Lock() threads [threading.Thread(target func, name 线程 str(i)) for i in range (1,4)] [t.start() for t in threads] [t.join() for t in threads] print(f当前线程{threading.current_thread().name}, g_num) g_num 0初始化全局变量g_num为0。 lock threading.Lock()创建线程锁对象用于同步线程。 thread [...]生成3个线程每个线程命名为“线程1”、“线程2”、“线程3”目标函数为func。 [t.start() for t in threads]启动所有线程注意变量名应为thread而非threads此处为代码错误。 [t.join() for t in threads]等待所有线程执行完毕变量名错误同上。 print(...)主线程打印最终的g_num值。 买票系统举例import threading import time def sale_ticket(): global ticket_num while True: lock.acquire() if ticket_num 0 lock.release() break time.sleep(0.01) ticket_num - 1 printf(f当前{thread.current_thread().name}卖了一张票还剩{ticket_num}) lock.release() if __name__ __main__: ticket_num 100 lock threading.Lock() threads [threading.Thread(Target sale_ticket, name windows str(i)) for i in range(1,4)] [t.start() for t in threads] [t.join() for t in threads]GILGILGlobal Interpreter Lock是 Python 解释器中的一个全局锁机制用于同步线程的执行。它确保同一时刻只有一个线程可以执行 Python 字节码这意味着在任何时间点都只能有一个线程处于执行状态从而简化了 CPython 解释器的内存管理设计。GIL 的主要目的是保护 Python 对象免受多线程并发访问导致的内存管理问题。CPython 使用引用计数进行垃圾回收而 GIL 避免了多个线程同时修改引用计数引发的竞争条件。4.进程和线程对比区别资源分配进程拥有独立的内存空间和系统资源每个进程都有自己的代码段、数据段和堆栈等。而线程共享所属进程的内存空间和资源同一进程内的线程之间可以直接访问共享内存。开销创建进程需要分配独立的内存、打开文件等系统资源开销较大创建线程只需所属进程的内存空间进行少量资源分配开销较小并发行多核心cpu环境下进程和线程都可以异步执行但进程之间的异步是真正的异步而线程之间的异步在单核心cpu上是通过时间片轮转实现的“伪异步”可靠性进程间相互隔离一个进程的错误不会扩散系统稳定性更高。线程共享内存一个线程的错误可能影响其他线程甚至导致整个进程崩溃。通信与同步进程间通信IPC需通过管道、消息队列、共享内存等机制实现复杂且速度较慢线程间可直接读写共享数据但需通过锁、信号量等机制避免竞态条件编程复杂度较高。适合使用多线程的情况I/O密集型任务程序需要频繁等待磁盘、网络或数据库响应时多线程可避免阻塞主线程。例如Web服务器处理并发请求或爬虫同时下载多个网页线程在等待I/O时可切换执行其他任务提高资源利用率。共享数据进程间通信IPC成本高于线程间通信。若任务需要频繁共享或修改同一数据多进程可能导致性能下降。适合使用多进程的勤情况CPU密集型任务当任务需要大量计算资源且受限于单个CPU核心时多进程能有效利用多核CPU的并行计算能力。例如科学计算、图像处理、机器学习模型训练等场景通过多进程可显著缩短计算时间。避免GIL限制Python特有在Python中由于全局解释器锁GIL的存在多线程无法真正并行执行CPU密集型任务。多进程可以绕过GIL限制每个进程拥有独立的Python解释器和内存空间。任务独立性高多进程适合处理相互独立、无需频繁共享数据的任务。例如批量文件处理、网页爬虫等场景每个进程可独立处理部分任务最后合并结果。稳定性要求高多进程中单个进程崩溃不会影响其他进程的运行适合需要高稳定性的场景。例如长时间运行的服务可通过多进程隔离风险。