初识多线程多进程

多线程多进程

Posted by BY Bigboss on June 7, 2022

初识多线程多进程

参考文章:

参考网站1

参考网站2

  • 单个CPU在任一时刻只能执行单个线程,只有多核CPU还能真正做到多个线程同时运行
  • 一个进程包含多个线程,这些线程可以分布在多个CPU上
  • 多核CPU同时运行的线程可以属于单个进程或不同进程
  • 所以,在大多数编程语言中因为切换消耗的资源更少,多线程比多进程效率更高(python除外)

GIL锁

由于历史遗留的原因,python中引入了GIL(Global Interpreter Lock),GIL规定,在一个进程中每次只能有一个线程在运行,如果程序想运行,必须获得GIL锁,然后运行结束或者遇到IO操作或者超时的时候会释放GIL锁,给其余的线程去竞争,竞争成功的线程获得GIL锁得到下一次运行的机会。

正是因为有GIL的存在,python的多线程其实是假的,所以才有人说python的多线程非常鸡肋。但是虽然每个进程有一个GIL锁,进程和进程之前还是不受影响的。

多线程与多进程的使用场景选择

  • CPU密集型操作使用多进程比较合适,例如海量运算
  • IO密集型操作使用多线程比较合适,例如爬虫,文件处理,批量ssh操作服务器等等

下面的变量是否共享也对两种方法的选择产生影响。

进程间是相互独立的,不共享内存空间,所以在一个进程中声明的变量在另一个进程中是看不到的。这时候就要借助一些工具来在两个进程间进行数据传输了,其中最常见的就是队列了。与多进程的内存独立不同,多线程间可以共享内存,所以同一个变量是可以被多个线程共享的,不需要额外的插件。想要让多个线程能同时操作某变量,要么将该变量作为参数传递到线程中(必须是可变变量,例如list和dict),要么作为全局变量在线程中用global关键字进行声明

多进程案例

from multiprocessing import Process
import os
import time


def long_time_task(i):
    print('子进程: {} - 任务{}'.format(os.getpid(), i))
    time.sleep(2)
    print("结果: {}".format(8 ** 20))


if __name__=='__main__':
    print('当前母进程: {}'.format(os.getpid()))
    start = time.time()
    p1 = Process(target=long_time_task, args=(1,))
    p2 = Process(target=long_time_task, args=(2,))
    print('等待所有子进程完成。')
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end = time.time()
    print("总共用时{}秒".format((end - start)))

注意的点:

  • 新创建的进程与进程的切换都是要耗资源的,所以平时工作中进程数不能开太大。
  • 同时可以运行的进程数一般受制于CPU的核数。
  • 除了使用Process方法,我们还可以使用Pool类创建多进程。

很多时候系统都需要创建多个进程以提高CPU的利用率,当数量较少时,可以手动生成一个个Process实例。当进程数量很多时,或许可以利用循环,但是这需要程序员手动管理系统中并发进程的数量,有时会很麻烦。这时进程池Pool就可以发挥其功效了。可以通过传递参数限制并发进程的数量,默认值为CPU的核数。

Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果进程池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

multiprocessing 模块下的Pool类的几个方法:

1.apply_async

函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

其作用是向进程池提交需要执行的函数及参数, 各个进程采用非阻塞(异步)的调用方式,即每个子进程只管运行自己的,不管其它进程是否已经完成。这是默认方式。

2.map()

函数原型:map(func, iterable[, chunksize=None])

Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回。 注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

3.map_async()

函数原型:map_async(func, iterable[, chunksize[, callback]]) 与map用法一致,但是它是非阻塞的。其有关事项见apply_async。

4.close()

关闭进程池(pool),使其不在接受新的任务。

5.terminate()

结束工作进程,不在处理未处理的任务。

6.join()

主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。

多进程中进程池的案例如下:

from multiprocessing import Pool, cpu_count
import os
import time


def long_time_task(i):
    print('子进程: {} - 任务{}'.format(os.getpid(), i))
    time.sleep(2)
    print("结果: {}".format(8 ** 20))


if __name__=='__main__':
    print("CPU内核数:{}".format(cpu_count()))
    print('当前母进程: {}'.format(os.getpid()))
    start = time.time()
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('等待所有子进程完成。')
    p.close()
    p.join()
    end = time.time()
    print("总共用时{}秒".format((end - start)))

对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close()或terminate()方法,让其不再接受新的Process了。

多进程间的数据共享与通信

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read:{}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

多线程案例

import threading
import time


def long_time_task(i):
    print('当前子线程: {} 任务{}'.format(threading.current_thread().name, i))
    time.sleep(2)
    print("结果: {}".format(8 ** 20))


if __name__=='__main__':
    start = time.time()
    print('这是主线程:{}'.format(threading.current_thread().name))
    thread_list = []
    for i in range(1, 3):
        t = threading.Thread(target=long_time_task, args=(i, ))
        thread_list.append(t)

    for t in thread_list:
        t.start()

    for t in thread_list:
        t.join()

    end = time.time()
    print("总共用时{}秒".format((end - start)))

不同线程之间的数据共享

一个进程所含的不同线程间共享内存,这就意味着任何一个变量都可以被任何一个线程修改,因此线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。如果不同线程间有共享的变量,其中一个方法就是在修改前给其上一把锁lock,确保一次只有一个线程能修改它。threading.lock()方法可以轻易实现对一个共享变量的锁定,修改完后release供其它线程使用。比如下例中账户余额balance是一个共享变量,使用lock可以使其不被改乱。

# -*- coding: utf-8 -*

import threading


class Account:
    def __init__(self):
        self.balance = 0

    def add(self, lock):
        # 获得锁
        lock.acquire()
        for i in range(0, 100000):
            self.balance += 1
        # 释放锁
        lock.release()

    def delete(self, lock):
        # 获得锁
        lock.acquire()
        for i in range(0, 100000):
            self.balance -= 1
            # 释放锁
        lock.release()


if __name__ == "__main__":
    account = Account()
    lock = threading.Lock()
    # 创建线程
   thread_add = threading.Thread(target=account.add, args=(lock,), name='Add')
    thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete')

    # 启动线程
   thread_add.start()
    thread_delete.start()

    # 等待线程结束
   thread_add.join()
    thread_delete.join()

    print('The final balance is: {}'.format(account.balance))