GVKun编程网logo

C/C++ 信号量 CreateSemaphore 用法(c++信号量有什么用)

13

对于想了解C/C++信号量CreateSemaphore用法的读者,本文将是一篇不可错过的文章,我们将详细介绍c++信号量有什么用,并且为您提供关于20、Semaphore信号量、C#多线程--信号量

对于想了解C/C++ 信号量 CreateSemaphore 用法的读者,本文将是一篇不可错过的文章,我们将详细介绍c++信号量有什么用,并且为您提供关于20、Semaphore信号量、C#多线程--信号量(Semaphore)、C#多线程--信号量(Semaphore)[z]、go中semaphore(信号量)源码解读的有价值信息。

本文目录一览:

C/C++ 信号量 CreateSemaphore 用法(c++信号量有什么用)

C/C++ 信号量 CreateSemaphore 用法(c++信号量有什么用)

HANDLE CreateSemaphore(
  LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,  // SD
  LONG lInitialCount,                          // initial count
  LONG lMaximumCount,                          // maximum count
  LPCTSTR lpName                           // object name
)
此函数可用来创建或打开一个信号量先看参数说明
lpSemaphoreAttributes:为信号量的属性,一般可以设置为NULL
lInitialCount:信号量初始值,必须大于等于0,而且小于等于 lpMaximumCount,如果lInitialCount 的初始值为0,则该信号量默认为unsignal状态,如果lInitialCount的初始值大于0,则该信号量默认为signal状态,
lMaximumCount: 此值为设置信号量的最大值,必须大于0
lpName:信号量的名字,长度不能超出MAX_PATH ,可设置为NULL,表示无名的信号量。当lpName不为空时,可创建有名的信号量,若当前信号量名与已存在的信号量的名字相同时,则该函数表示打开该信号量,这时参数lInitialCount 和
lMaximumCount 将被忽略。
函数调用成功返回信号量句柄。
释放信号量函数:
BOOL ReleaseSemaphore( HANDLE hSemaphore, // handle to semaphore
LONG lReleaseCount, // count increment amount
LPLONG lpPreviousCount // previous count);
参数说明:

hSemaphore:信号量句柄,
lReleaseCount:释放的数量,一般完成一个等待后调用此函数释放一个信号量,使得信号量平衡。
lpPreviousCount :存放以前信号量的数量 ,一般可为NULL.

打开信号量函数:
HANDLE OpenSemaphore( DWORD dwDesiredAccess, // access BOOL bInheritHandle, // inheritance option LPCTSTR lpName// object name );
此函数打开一个有名的信号量
参数说明:
dwDesiredAccess:对信号量的访问权限,取值可以是SEMAPHORE_ALL_ACCESS,可对信号量执行尽可能多的操作;可以是SEMAPHORE_MODIFY_STATE,允许使用ReleaseSemaphore释放信号量,达到修改信号量;还可以是SYNCHRONIZE,用等待函数异步的等待信号量变为signal状态
bInheritHandle:如果为true,信号量句柄可被继承,反之不可继承。
lpName :信号量的名字。

别忘了最后使用完成后用CloseHandle()关闭信号量句柄
暂时API写到此为止,以后还有关信号量的函数,再继续添加。下面通过一个例子,来说明使用信号量来世线程同步的例子,要求创建三个线程,每个线程次打印十次ID,必须三个线程一次打印。注意:在使用WaitForSingleObject等待信号量句柄时,若信号量为signal状态,则wait过后信号量自动减1,直到使用ReleaseSemaphore释放信号量,信号量才可增加
代码如下:
说明:创建控制台程序。

#include "stdafx.h"
#include <Windows.h>


DWORD WINAPI Thread_1(LPVOID param);
DWORD WINAPI Thread_2(LPVOID param);
DWORD WINAPI Thread_3(LPVOID param);

HANDLE hSM_1;
HANDLE hSM_2;
HANDLE hSM_3;

HANDLE hThread_1;
HANDLE hThread_2;
HANDLE hThread_3;

int _tmain(int argc, _TCHAR* argv[])
{

//创建三个信号量

hSM_1 = CreateSemaphore(NULL, 1, 1, L"A");//开始为signal状态
hSM_2 = CreateSemaphore(NULL, 0, 1, L"B");//开始为unsignal状态,等待hSM_1释放
hSM_3 = CreateSemaphore(NULL, 0, 1, L"C");//开始为unsignal状态,等待hSM_2

//创建三个线程

hThread_1 = CreateThread(NULL, 0, Thread_1, NULL, 0, NULL);
hThread_2 = CreateThread(NULL, 0, Thread_2, NULL, 0, NULL);
hThread_3 = CreateThread(NULL, 0, Thread_3, NULL, 0, NULL);
//等待三个线程都执行完
WaitForSingleObject(hThread_1, INFINITE);
WaitForSingleObject(hThread_2, INFINITE);
WaitForSingleObject(hThread_3, INFINITE);

//三个线程都执行完
printf("\n\n\t main end \n");
//关闭句柄
CloseHandle(hThread_1);
CloseHandle(hThread_2);
CloseHandle(hThread_3);
CloseHandle(hSM_1);
CloseHandle(hSM_2);
CloseHandle(hSM_3);
return 0;
}

DWORD WINAPI Thread_1(LPVOID param)
{
for (int i = 0; i < 10; i ++)
{
DWORD dwWait = WaitForSingleObject(hSM_1, INFINITE);

//每一个wait过后信号量的数量自动减1,这样就达到了控制同步

printf("A");
ReleaseSemaphore(hSM_2, 1, NULL);
}
return 0;
}

DWORD WINAPI Thread_2(LPVOID param)
{
for (int i = 0; i < 10; i ++)
{
WaitForSingleObject(hSM_2, INFINITE);
printf("B");
ReleaseSemaphore(hSM_3, 1, NULL);
}
return 0;
}

DWORD WINAPI Thread_3(LPVOID param)
{
for (int i = 0; i < 10; i ++)
{
WaitForSingleObject(hSM_3, INFINITE);
printf("C ");
ReleaseSemaphore(hSM_1, 1, NULL);
}
return 0;
}

 

 

运行结果:
C/C++ 信号量 CreateSemaphore 用法 - 快乐男孩 - 快乐男孩的博客

 

20、Semaphore信号量

20、Semaphore信号量

1、介绍

Synchronized锁针对的是”单值“临界资源,但是对于”多值“的临界资源,比如系统访问并发量为500,这个时候单纯使用Synchronized就需要自己额外判断剩余访问量,比较繁琐。此时可以引入Semaphore(信号量),它标记着有多个可用资源,只要能从其中拿到锁,就证明一定还有可用的连接,从而也省去了判断这一业务逻辑。

注意:连接池不行,因为获取信号量是可以并发获取的,如果同时有两个线程获取到信号量,并发去pool里面获取connect就会出问题。

Semaphore通过acquire(请求) 和 release(归还)两个方法实现对资源的获取和释放 注意:release必须在finally中执行,避免出现异常无法归还资源

2、代码

import java.util.concurrent.Semaphore;

/**
 * 设定一个初始信号量
 * 用多于这个幸好量的线程来争夺,最后发现每次最多三个线程处理
 */
public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);

        new Thread(() -> {
            while (true) {
                try {
                    semaphore.acquire();
                    System.out.println("线程  1   获得了锁");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }
        }).start();

        new Thread(() -> {
            while (true) {
                try {
                    semaphore.acquire();
                    System.out.println("线程  2   获得了锁");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }
        }).start();

        new Thread(() -> {
            while (true) {
                try {
                    semaphore.acquire();
                    System.out.println("线程  3   获得了锁");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }
        }).start();

        new Thread(() -> {
            while (true) {
                try {
                    semaphore.acquire();
                    System.out.println("线程  4   获得了锁");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }
        }).start();

        new Thread(() -> {
            while (true) {
                try {
                    semaphore.acquire();
                    System.out.println("线程  5   获得了锁");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }
        }).start();
    }
}

C#多线程--信号量(Semaphore)

C#多线程--信号量(Semaphore)

百度百科:Semaphore,是负责协调各个线程, 以保证它们能够正确、合理的使用公共资源。也是操作系统中用于控制进程同步互斥的量。

Semaphore常用的方法有两个WaitOne()Release(),Release()的作用是退出信号量并返回前一个计数,而WaitOne()则是阻止当前线程,直到当前线程的WaitHandle 收到信号。这里我举一个例子让大家更容易理解:当我们这样实例化Semaphore时候

Semaphore sema = new Semaphore(x,y)

有一队人排队上洗手间,人就相当于线程,x为还剩余的位置数量,y为总的位置数量。

WaitOne()方法就相当于人在等待洗手间位置的行为,而Release()方法就相当于一个人从洗手间出来的行为,这里再假设x和y都为5,说明开始的时候洗手间有5个空位置,且总共只有5个位置,当一队超过5个人的队伍要上洗手间的就排队,首先WaitOne()方法等待,发现有空位就依次进去,每进去一个空位减一,直到进去5之后个没有空位,这时候后面的人就一直等待,直到进去的人从洗手间出来Release()方法,空位加一,在等待WaitOne()方法的人发现有空位又进去一个空位减一……如此循环往复。

请看下面例子:

public class Program
    {
        static Semaphore sema = new Semaphore(5, 5);
        const int cycleNum = 9;
        static void Main(string[] args) 
        {
            for(int i = 0; i < cycleNum; i++)
            {
                Thread td = new Thread(new ParameterizedThreadStart(testFun));
                td.Name = string.Format("编号{0}",i.ToString());
                td.Start(td.Name);
            }
            Console.ReadKey();
        }
        public static void testFun(object obj)
        {
            sema.WaitOne();
            Console.WriteLine(obj.ToString() + "进洗手间:" + DateTime.Now.ToString());
            Thread.Sleep(2000);
            Console.WriteLine(obj.ToString() + "出洗手间:" + DateTime.Now.ToString());
            sema.Release();
        }
    }

运行结果如下:

image

这里我要说明一点,信号量控制的只是线程同步的量,而不管顺序,这个例子来说线程控制的就是线程同步量为5,也就是同时并发的线程数量为5个,至于是哪个先哪个后不是由这里的信号量决定的。

当然这个例子中因没有做什么复杂的操作,一般情况进入线程的时间和每个线程要的时间不会有太大差别,所以执行的顺序还是很规律的(为了说明这个问题我也是运行了多次才让结果稍有不同,这里编号2抢在了编号1前面就是这个道理),如果线程中有很复杂的操作每个线程在运行中所用的时间有比较大的差别,或者循环开始有复杂操作那么很可能就不是编号0先进入洗手间了,且不一定是先进入的就会先出来。

接下来再简单介绍一下Semaphore的WaitOne()和Release()的重载方法

public int Release(int releaseCount);

releaseCount指的是释放的信号量数量,如果没有参数默认为1,Release()就相当于Release(1)

这里要说明一点,当Release()或者Release(int releaseCount)执行时导致信号量计数大于最大数量时会抛出SemaphoreFullException异常。下面这种情况就会异常:

Semaphore sem = new Semaphore(4,5);
sem.Release(2);//这里是释放2个信号量加上之前的4个,超出5个了

public virtual bool WaitOne(TimeSpan timeout);
public virtual bool WaitOne(int millisecondsTimeout);

第一个重载参数timeout:指定时间间隔,若在这段时间内没有接收到信号则跳过等待继续执行

第二个重载参数millisecondsTimeout:指定时间间隔整数毫秒,若在这段时间内没有接收到信号则跳过等待继续执行

WaitOne()还有两个重载方法不是很常用这里就不介绍了。上面的重载方法这里也不再进了案例说明了,有兴趣的朋友可以自己尝试一下。

 

C#多线程--信号量(Semaphore)[z]

C#多线程--信号量(Semaphore)[z]

百度百科:Semaphore,是负责协调各个线程, 以保证它们能够正确、合理的使用公共资源。也是操作系统中用于控制进程同步互斥的量。

Semaphore常用的方法有两个WaitOne()Release(),Release()的作用是退出信号量并返回前一个计数,而WaitOne()则是阻止当前线程,直到当前线程的WaitHandle 收到信号。这里我举一个例子让大家更容易理解:当我们这样实例化Semaphore时候

Semaphore sema = new Semaphore(x,y)

有一队人排队上洗手间,人就相当于线程,x为还剩余的位置数量,y为总的位置数量。

WaitOne()方法就相当于人在等待洗手间位置的行为,而Release()方法就相当于一个人从洗手间出来的行为,这里再假设x和y都为5,说明开始的时候洗手间有5个空位置,且总共只有5个位置,当一队超过5个人的队伍要上洗手间的就排队,首先WaitOne()方法等待,发现有空位就依次进去,每进去一个空位减一,直到进去5之后个没有空位,这时候后面的人就一直等待,直到进去的人从洗手间出来Release()方法,空位加一,在等待WaitOne()方法的人发现有空位又进去一个空位减一……如此循环往复。

请看下面例子:

复制代码
public class Program
    {
        static Semaphore sema = new Semaphore(5, 5);
        const int cycleNum = 9;
        static void Main(string[] args) 
        {
            for(int i = 0; i < cycleNum; i++)
            {
                Thread td = new Thread(new ParameterizedThreadStart(testFun));
                td.Name = string.Format("编号{0}",i.ToString());
                td.Start(td.Name);
            }
            Console.ReadKey();
        }
        public static void testFun(object obj)
        {
            sema.WaitOne();
            Console.WriteLine(obj.ToString() + "进洗手间:" + DateTime.Now.ToString());
            Thread.Sleep(2000);
            Console.WriteLine(obj.ToString() + "出洗手间:" + DateTime.Now.ToString());
            sema.Release();
        }
    }
复制代码

运行结果如下:

image

这里我要说明一点,信号量控制的只是线程同步的量,而不管顺序,这个例子来说线程控制的就是线程同步量为5,也就是同时并发的线程数量为5个,至于是哪个先哪个后不是由这里的信号量决定的。

当然这个例子中因没有做什么复杂的操作,一般情况进入线程的时间和每个线程要的时间不会有太大差别,所以执行的顺序还是很规律的(为了说明这个问题我也是运行了多次才让结果稍有不同,这里编号2抢在了编号1前面就是这个道理),如果线程中有很复杂的操作每个线程在运行中所用的时间有比较大的差别,或者循环开始有复杂操作那么很可能就不是编号0先进入洗手间了,且不一定是先进入的就会先出来。

接下来再简单介绍一下Semaphore的WaitOne()和Release()的重载方法

public int Release(int releaseCount);

releaseCount指的是释放的信号量数量,如果没有参数默认为1,Release()就相当于Release(1)

这里要说明一点,当Release()或者Release(int releaseCount)执行时导致信号量计数大于最大数量时会抛出SemaphoreFullException异常。下面这种情况就会异常:

Semaphore sem = new Semaphore(4,5);
sem.Release(2);//这里是释放2个信号量加上之前的4个,超出5个了

 

public virtual bool WaitOne(TimeSpan timeout);
public virtual bool WaitOne(int millisecondsTimeout);

第一个重载参数timeout:指定时间间隔,若在这段时间内没有接收到信号则跳过等待继续执行

第二个重载参数millisecondsTimeout:指定时间间隔整数毫秒,若在这段时间内没有接收到信号则跳过等待继续执行

WaitOne()还有两个重载方法不是很常用这里就不介绍了。上面的重载方法这里也不再进了案例说明了,有兴趣的朋友可以自己尝试一下。

go中semaphore(信号量)源码解读

go中semaphore(信号量)源码解读

  • 运行时信号量机制 semaphore
    • 前言
    • 作用是什么
    • 几个主要的方法
    • 如何实现
    • sudog 缓存
      • acquireSudog
      • releaseSudog
    • semaphore
      • poll_runtime_Semacquire/sync_runtime_SemacquireMutex
      • sync_runtime_Semrelease
    • 参考

运行时信号量机制 semaphore

前言

最近在看源码,发现好多地方用到了这个semaphore

本文是在go version go1.13.15 darwin/amd64上进行的

作用是什么

下面是官方的描述

// Semaphore implementation exposed to Go.
// Intended use is provide a sleep and wakeup
// primitive that can be used in the contended case
// of other synchronization primitives.
// Thus it targets the same goal as Linux's futex,
// but it has much simpler semantics.
//
// That is, don't think of these as semaphores.
// Think of them as a way to implement sleep and wakeup
// such that every sleep is paired with a single wakeup,
// even if, due to races, the wakeup happens before the sleep.

// 具体的用法是提供 sleep 和 wakeup 原语
// 以使其能够在其它同步原语中的竞争情况下使用
// 因此这里的 semaphore 和 Linux 中的 futex 目标是一致的
// 只不过语义上更简单一些
//
// 也就是说,不要认为这些是信号量
// 把这里的东西看作 sleep 和 wakeup 实现的一种方式
// 每一个 sleep 都会和一个 wakeup 配对
// 即使在发生 race 时,wakeup 在 sleep 之前时也是如此  

上面提到了和futex作用一样,关于futex

futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁如信号量和POSIX互斥的基本工具

Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用cpu提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在用户空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用。

go中的semaphore作用和futex目标一样,提供sleepwakeup原语,使其能够在其它同步原语中的竞争情况下使用。当一个goroutine需要休眠时,将其进行集中存放,当需要wakeup时,再将其取出,重新放入调度器中。

例如在读写锁的实现中,读锁和写锁之前的相互阻塞唤醒,就是通过sleepwakeup实现,当有读锁存在的时候,新加入的写锁通过semaphore阻塞自己,当前面的读锁完成,在通过semaphore唤醒被阻塞的写锁。

写锁

// 获取互斥锁
// 阻塞等待所有读操作结束(如果有的话)
func (rw *RWMutex) Lock() {
	...
	// 原子的修改readerCount的值,直接将readerCount减去rwmutexMaxReaders
	// 说明,有写锁进来了,这在上面的读锁中也有体现
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// 当r不为0说明,当前写锁之前有读锁的存在
	// 修改下readerWait,也就是当前写锁需要等待的读锁的个数  
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		// 阻塞当前写锁
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	...
}

通过runtime_SemacquireMutex对当前写锁进行sleep

读锁释放

// 减少读操作计数,即readerCount--
// 唤醒等待写操作的协程(如果有的话)
func (rw *RWMutex) RUnlock() {
	...
	// 首先通过atomic的原子性使readerCount-1
	// 1.若readerCount大于0, 证明当前还有读锁, 直接结束本次操作
	// 2.若readerCount小于0, 证明已经没有读锁, 但是还有因为读锁被阻塞的写锁存在
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// 尝试唤醒被阻塞的写锁
		rw.rUnlockSlow(r)
	}
	...
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	...
	// readerWait--操作,如果readerWait--操作之后的值为0,说明,写锁之前,已经没有读锁了
	// 通过writerSem信号量,唤醒队列中第一个阻塞的写锁
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// 唤醒一个写锁
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

写锁处理完之后,调用runtime_Semrelease来唤醒sleep的写锁

几个主要的方法

go/src/sync/runtime.go中,定义了这几个方法

// Semacquire等待*s > 0,然后原子递减它。
// 它是一个简单的睡眠原语,用于同步
// library and不应该直接使用。
func runtime_Semacquire(s *uint32)

// SemacquireMutex类似于Semacquire,用来阻塞互斥的对象
// 如果lifo为true,waiter将会被插入到队列的头部
// skipframes是跟踪过程中要省略的帧数,从这里开始计算
// runtime_SemacquireMutex's caller.
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

// Semrelease会自动增加*s并通知一个被Semacquire阻塞的等待的goroutine
// 它是一个简单的唤醒原语,用于同步
// library and不应该直接使用。
// 如果handoff为true, 传递信号到队列头部的waiter
// skipframes是跟踪过程中要省略的帧数,从这里开始计算
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

具体的实现是在go/src/runtime/sema.go

//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
	semacquire1(addr, false, semaBlockProfile, 0)
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
	semrelease1(addr, handoff, skipframes)
}

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

如何实现

sudog 缓存

semaphore的实现使用到了sudog,我们先来看下

sudog 是运行时用来存放处于阻塞状态的goroutine的一个上层抽象,是用来实现用户态信号量的主要机制之一。 例如当一个goroutine因为等待channel的数据需要进行阻塞时,sudog会将goroutine及其用于等待数据的位置进行记录, 并进而串联成一个等待队列,或二叉平衡树。

// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
	// 以下字段受hchan保护
	g *g

	// isSelect 表示 g 正在参与一个 select, so
	// 因此 g.selectDone 必须以 CAS 的方式来获取wake-up race.
	isSelect bool
	next     *sudog
	prev     *sudog
	elem     unsafe.Pointer // 数据元素(可能指向栈)

	// 以下字段不会并发访问。
	// 对于通道,waitlink只被g访问。
	// 对于信号量,所有字段(包括上面的字段)
	// 只有当持有一个semroot锁时才被访问。
	acquiretime int64
	releasetime int64
	ticket      uint32
	parent      *sudog //semaRoot 二叉树
	waitlink    *sudog // g.waiting 列表或 semaRoot
	waittail    *sudog // semaRoot
	c           *hchan // channel
}

sudog的获取和归还,遵循以下策略:

1、获取,首先从per-P缓存获取,对于per-P缓存,如果per-P缓存为空,则从全局池抓取一半,然后取出per-P缓存中的最后一个;

2、归还,归还到per-P缓存,如果per-P缓存满了,就把per-P缓存的一半归还到全局缓存中,然后归还sudogper-P缓存中。

acquireSudog

1、如果per-P缓存的内容没达到长度的一般,则会从全局额缓存中抓取一半;

2、然后返回把per-P缓存中最后一个sudog返回,并且置空;

// go/src/runtime/proc.go
//go:nosplit
func acquireSudog() *sudog {
	// Delicate dance: 信号量的实现调用acquireSudog,然后acquireSudog调用new(sudog)
	// new调用malloc, malloc调用垃圾收集器,垃圾收集器在stopTheWorld调用信号量
	// 通过在new(sudog)周围执行acquirem/releasem来打破循环
	// acquirem/releasem在new(sudog)期间增加m.locks,防止垃圾收集器被调用。

	// 获取当前 g 所在的 m
	mp := acquirem()
	// 获取p的指针
	pp := mp.p.ptr()
	if len(pp.sudogcache) == 0 {
		lock(&sched.sudoglock)
		// 首先,尝试从中央缓存获取一批数据。
		for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
			s := sched.sudogcache
			sched.sudogcache = s.next
			s.next = nil
			pp.sudogcache = append(pp.sudogcache, s)
		}
		unlock(&sched.sudoglock)
		// 如果中央缓存中没有,新分配
		if len(pp.sudogcache) == 0 {
			pp.sudogcache = append(pp.sudogcache, new(sudog))
		}
	}
	// 取缓存中最后一个
	n := len(pp.sudogcache)
	s := pp.sudogcache[n-1]
	pp.sudogcache[n-1] = nil
	// 将刚取出的在缓存中移除
	pp.sudogcache = pp.sudogcache[:n-1]
	if s.elem != nil {
		throw("acquireSudog: found s.elem != nil in cache")
	}
	releasem(mp)
	return s
}

releaseSudog

1、如果per-P缓存满了,就归还per-P缓存一般的内容到全局缓存;

2、然后将回收的sudog放到per-P缓存中。

// go/src/runtime/proc.go
//go:nosplit
func releaseSudog(s *sudog) {
	if s.elem != nil {
		throw("runtime: sudog with non-nil elem")
	}
	if s.isSelect {
		throw("runtime: sudog with non-false isSelect")
	}
	if s.next != nil {
		throw("runtime: sudog with non-nil next")
	}
	if s.prev != nil {
		throw("runtime: sudog with non-nil prev")
	}
	if s.waitlink != nil {
		throw("runtime: sudog with non-nil waitlink")
	}
	if s.c != nil {
		throw("runtime: sudog with non-nil c")
	}
	gp := getg()
	if gp.param != nil {
		throw("runtime: releaseSudog with non-nil gp.param")
	}
	// 避免重新安排到另一个P
	mp := acquirem() // avoid rescheduling to another P
	pp := mp.p.ptr()
	// 如果缓存满了
	if len(pp.sudogcache) == cap(pp.sudogcache) {
		// 将本地高速缓存的一半传输到中央高速缓存
		var first, last *sudog
		for len(pp.sudogcache) > cap(pp.sudogcache)/2 {
			n := len(pp.sudogcache)
			p := pp.sudogcache[n-1]
			pp.sudogcache[n-1] = nil
			pp.sudogcache = pp.sudogcache[:n-1]
			if first == nil {
				first = p
			} else {
				last.next = p
			}
			last = p
		}
		lock(&sched.sudoglock)
		last.next = sched.sudogcache
		sched.sudogcache = first
		unlock(&sched.sudoglock)
	}
	// 归还sudog到`per-P`缓存中
	pp.sudogcache = append(pp.sudogcache, s)
	releasem(mp)
}

semaphore

// go/src/runtime/sema.go
// 用于sync.Mutex的异步信号量。

// semaRoot拥有一个具有不同地址(s.elem)的sudog平衡树。
// 每个sudog都可以依次(通过s.waitlink)指向一个列表,在相同地址上等待的其他sudog。
// 对具有相同地址的sudog内部列表进行的操作全部为O(1)。顶层semaRoot列表的扫描为O(log n),
// 其中,n是阻止goroutines的不同地址的数量,通过他们散列到给定的semaRoot。
type semaRoot struct {
	lock  mutex
	// waiters的平衡树的根节点
	treap *sudog
	// waiters的数量,读取的时候无所
	nwait uint32
}

// Prime to not correlate with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
	root semaRoot
	pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

poll_runtime_Semacquire/sync_runtime_SemacquireMutex

// go/src/runtime/sema.go
//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
	semacquire1(addr, false, semaBlockProfile, 0)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}


func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
	// 判断这个goroutine,是否是m上正在运行的那个
	gp := getg()
	if gp != gp.m.curg {
		throw("semacquire not on the G stack")
	}

	// *addr -= 1
	if cansemacquire(addr) {
		return
	}

	// 增加等待计数
	// 再试一次 cansemacquire 如果成功则直接返回
	// 将自己作为等待者入队
	// 休眠
	// (等待器描述符由出队信号产生出队行为)

	// 获取一个sudog
	s := acquireSudog()
	root := semroot(addr)
	t0 := int64(0)
	s.releasetime = 0
	s.acquiretime = 0
	s.ticket = 0
	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
		t0 = cputicks()
		s.releasetime = -1
	}
	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
		if t0 == 0 {
			t0 = cputicks()
		}
		s.acquiretime = t0
	}
	for {
		lock(&root.lock)
		// 添加我们自己到nwait来禁用semrelease中的"easy case"
		atomic.Xadd(&root.nwait, 1)
		// 检查cansemacquire避免错过唤醒
		if cansemacquire(addr) {
			atomic.Xadd(&root.nwait, -1)
			unlock(&root.lock)
			break
		}
		// 任何在 cansemacquire 之后的 semrelease 都知道我们在等待(因为设置了 nwait),因此休眠

		// 队列将s添加到semaRoot中被阻止的goroutine中
		root.queue(addr, s, lifo)
		// 将当前goroutine置于等待状态并解锁锁。
		// 通过调用goready(gp),可以使goroutine再次可运行。
		goparkunlock(&root.lock, waitReasonSemacquire, traceEvgoBlockSync, 4+skipframes)
		if s.ticket != 0 || cansemacquire(addr) {
			break
		}
	}
	if s.releasetime > 0 {
		blockevent(s.releasetime-t0, 3+skipframes)
	}

	// 归还sudog
	releaseSudog(s)
}

func cansemacquire(addr *uint32) bool {
	for {
		v := atomic.Load(addr)
		if v == 0 {
			return false
		}
		if atomic.Cas(addr, v, v-1) {
			return true
		}
	}
}

sync_runtime_Semrelease

// go/src/runtime/sema.go
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
	semrelease1(addr, handoff, skipframes)
}

func semrelease1(addr *uint32, handoff bool, skipframes int) {
	root := semroot(addr)
	atomic.Xadd(addr, 1)

	// Easy case:没有等待者
	// 这个检查必须发生在xadd之后,以避免错过唤醒
	if atomic.Load(&root.nwait) == 0 {
		return
	}

	// Harder case: 找到等待者,并且唤醒
	lock(&root.lock)
	if atomic.Load(&root.nwait) == 0 {
		// 该计数已被另一个goroutine占用,
		// 因此无需唤醒其他goroutine。
		unlock(&root.lock)
		return
	}

	// 搜索一个等待着然后将其唤醒
	s, t0 := root.dequeue(addr)
	if s != nil {
		atomic.Xadd(&root.nwait, -1)
	}
	unlock(&root.lock)
	if s != nil { // 可能会很慢,因此先解锁
		acquiretime := s.acquiretime
		if acquiretime != 0 {
			mutexevent(t0-acquiretime, 3+skipframes)
		}
		if s.ticket != 0 {
			throw("corrupted semaphore ticket")
		}
		if handoff && cansemacquire(addr) {
			s.ticket = 1
		}
		// goready(s.g, 5) 
		// 标记 runnable,等待被重新调度
		readyWithTime(s, 5+skipframes)
	}
}

摘自"同步原语"的一段总结

这一对 semacquire 和 semrelease 理解上可能不太直观。 首先,我们必须意识到这两个函数一定是在两个不同的 M(线程)上得到执行,否则不会出现并发,我们不妨设为 M1 和 M2。 当 M1 上的 G1 执行到 semacquire1 时,如果快速路径成功,则说明 G1 抢到锁,能够继续执行。但一旦失败且在慢速路径下 依然抢不到锁,则会进入 goparkunlock,将当前的 G1 放到等待队列中,进而让 M1 切换并执行其他 G。 当 M2 上的 G2 开始调用 semrelease1 时,只是单纯的将等待队列的 G1 重新放到调度队列中,而当 G1 重新被调度时(假设运气好又在 M1 上被调度),代码仍然会从 goparkunlock 之后开始执行,并再次尝试竞争信号量,如果成功,则会归还 sudog。

参考

【同步原语】https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/sync/
【Go并发编程实战--信号量的使用方法和其实现原理】https://juejin.cn/post/6906677772479889422
【Semaphore】https://github.com/cch123/golang-notes/blob/master/semaphore.md
【进程同步之信号量机制(pv操作)及三个经典同步问题】https://blog.csdn.net/SpeedMe/article/details/17597373

本文作者:liz
本文链接:https://boilingfrog.github.io/2021/04/02/semaphore/
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

关于C/C++ 信号量 CreateSemaphore 用法c++信号量有什么用的问题我们已经讲解完毕,感谢您的阅读,如果还想了解更多关于20、Semaphore信号量、C#多线程--信号量(Semaphore)、C#多线程--信号量(Semaphore)[z]、go中semaphore(信号量)源码解读等相关内容,可以在本站寻找。

本文标签: