读写锁是把共享资源的访问分为读者和写者。读者能共享访问,写者能进行写操作.很明显这个比简单的互斥访问要好很多。提高性能效率我的理解是在设计上遵循以下原则
1.锁的深度 范围
2.多线程对共享资源访问的频率 次数
3.让访问尽量可控。
读写锁我觉得就是让这种访问可控.
一、读写锁的特征
1.当读写锁是写加锁状态时, 在这个锁被解锁之前, 所有试图对这个锁加锁的线程都会被阻塞.
2.当读写锁在读加锁状态时, 所有试图以读模式对它进行加锁的线程都可以得到访问权, 但是如果线程希望以写模式对此锁进行加锁, 它必须直到所有的线程释放锁.
3.通常, 当读写锁处于读模式锁住状态时, 如果有另外线程试图以写模式加锁, 读写锁通常会阻塞随后的读模式锁请求, 这样可以避免读模式锁长期占用, 而等待的写模式锁请求长期阻塞.
要满足以上3点,那pthread中是如何实现的呢?有哪些借鉴意义呢?是不是可以扩展一下呢? 在windows上又如何实现呢?
二、读写锁的应用范围场景
适合于多个线程读,一个线程写。
三、pthread中的具体实现:
pthread中的具体实现包括 pthread_rwlock_init.c、pthread_rwlock_destroy.c、pthread_rwlock_rdlock.c、pthread_rwlock_wrlock.c、pthread_rwlock_tryrdlock.c、pthread_rwlock_trywrlock.c、pthread_rwlock_unlock.c.
pthread_rwlock_init.c:
int __pthread_rwlock_init (rwlock, attr)
pthread_rwlock_t *rwlock;
const pthread_rwlockattr_t *attr;
{
const struct pthread_rwlockattr *iattr;
iattr = ((const struct pthread_rwlockattr *) attr) ?: &default_rwlockattr;
memset (rwlock, '\0', sizeof (*rwlock));
rwlock->__data.__flags
= iattr->lockkind == PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP;
#ifdef __ASSUME_PRIVATE_FUTEX
rwlock->__data.__shared = (iattr->pshared == PTHREAD_PROCESS_PRIVATE
? 0 : FUTEX_PRIVATE_FLAG);
#else
rwlock->__data.__shared = (iattr->pshared == PTHREAD_PROCESS_PRIVATE
? 0
: THREAD_GETMEM (THREAD_SELF,
header.private_futex));
#endif
return 0;
}
这个init就是初始化一下rwlock.
重点是pthread_rwlock_rdlock.c、pthread_rwlock_wrlock.c 和pthread_rwlock_unlock.c的实现
pthread_rwlock_rdlock.c:
int
__pthread_rwlock_rdlock (rwlock)
pthread_rwlock_t *rwlock;
{
int result = 0;
LIBC_PROBE (rdlock_entry, 1, rwlock);
/* Make sure we are alone. */
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
while (1)
{
/* Get the rwlock if there is no writer... */
if (rwlock->__data.__writer == 0
/* ...and if either no writer is waiting or we prefer readers. */
&& (!rwlock->__data.__nr_writers_queued
|| PTHREAD_RWLOCK_PREFER_READER_P (rwlock)))
{
/* Increment the reader counter. Avoid overflow. */
if (__builtin_expect (++rwlock->__data.__nr_readers == 0, 0))
{
/* Overflow on number of readers. */
--rwlock->__data.__nr_readers;
result = EAGAIN;
}
else
LIBC_PROBE (rdlock_acquire_read, 1, rwlock);
break;
}
/* Make sure we are not holding the rwlock as a writer. This is
a deadlock situation we recognize and report. */
if (__builtin_expect (rwlock->__data.__writer
== THREAD_GETMEM (THREAD_SELF, tid), 0))
{
result = EDEADLK;
break;
}
/* Remember that we are a reader. */
if (__builtin_expect (++rwlock->__data.__nr_readers_queued == 0, 0))
{
/* Overflow on number of queued readers. */
--rwlock->__data.__nr_readers_queued;
result = EAGAIN;
break;
}
int waitval = rwlock->__data.__readers_wakeup;
/* Free the lock. */
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
/* Wait for the writer to finish. */
lll_futex_wait (&rwlock->__data.__readers_wakeup, waitval,
rwlock->__data.__shared);
/* Get the lock. */
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
--rwlock->__data.__nr_readers_queued;
}
/* We are done, free the lock. */
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
return result;
}
pthread_rwlock_wrlock.c:
int
__pthread_rwlock_wrlock (rwlock)
pthread_rwlock_t *rwlock;
{
int result = 0;
LIBC_PROBE (wrlock_entry, 1, rwlock);
/* Make sure we are alone. */
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
while (1)
{
/* Get the rwlock if there is no writer and no reader. */
if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0)
{
/* Mark self as writer. */
rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid);
LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
break;
}
/* Make sure we are not holding the rwlock as a writer. This is
a deadlock situation we recognize and report. */
if (__builtin_expect (rwlock->__data.__writer
== THREAD_GETMEM (THREAD_SELF, tid), 0))
{
result = EDEADLK;
break;
}
/* Remember that we are a writer. */
if (++rwlock->__data.__nr_writers_queued == 0)
{
/* Overflow on number of queued writers. */
--rwlock->__data.__nr_writers_queued;
result = EAGAIN;
break;
}
int waitval = rwlock->__data.__writer_wakeup;
/* Free the lock. */
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
/* Wait for the writer or reader(s) to finish. */
lll_futex_wait (&rwlock->__data.__writer_wakeup, waitval,
rwlock->__data.__shared);
/* Get the lock. */
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
/* To start over again, remove the thread from the writer list. */
--rwlock->__data.__nr_writers_queued;
}
/* We are done, free the lock. */
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
return result;
}
pthread_rwlock_unlock.c:
int
__pthread_rwlock_unlock (pthread_rwlock_t *rwlock)
{
LIBC_PROBE (rwlock_unlock, 1, rwlock);
lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
if (rwlock->__data.__writer)
rwlock->__data.__writer = 0;
else
--rwlock->__data.__nr_readers;
if (rwlock->__data.__nr_readers == 0)
{
if (rwlock->__data.__nr_writers_queued)
{
++rwlock->__data.__writer_wakeup;
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
lll_futex_wake (&rwlock->__data.__writer_wakeup, 1,
rwlock->__data.__shared);
return 0;
}
else if (rwlock->__data.__nr_readers_queued)
{
++rwlock->__data.__readers_wakeup;
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
lll_futex_wake (&rwlock->__data.__readers_wakeup, INT_MAX,
rwlock->__data.__shared);
return 0;
}
}
lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
return 0;
}
整理一下实现的思路
几个标志控制变量的说明
rwlock->__data.__writer:非0表示当前被写者占用
rwlock->__data.__nr_readers:读者共享占用资源的读者数量
rwlock->__data.__nr_writers_queued:写者的锁请求数
rwlock->__data.__nr_readers_queued:读者的锁请求数
写锁执行流程:
1.lock锁定 (实现对一些公共的控制标志的顺序访问)
2.判断如果没有写者也没有读者在访问共享资源
则将写者rwlock->__data.__writer 标记成自己.非零值 退出循环的自旋方式 lock解锁
3. 如果有别的在访问共享资源 则将写队列请求数加一(++rwlock->__data.__nr_writers_queued)
表示有写请求要访问资源,然后释放lock锁定 等待其他的已经占用的线程访问完成。
4.其他的访问资源 完成后,在pthread_rwlock_unlock中判断如果没有读者占用了,唤醒第3步的等待
5.重新再lock锁定,将写者锁定请求数减一,重新回到2处判断。这时因为其他的读者锁请求没有唤醒 所以优先第2步就执行了也就是满足了读写锁的特征3.
读锁执行流程:
1.lock锁定 (实现对一些公共的控制标志的顺序访问)
2.判断如果没有写者且没有写者锁请求
则将读者++rwlock->__data.__nr_readers 退出循环的自旋方式 lock解锁
3. 如果不符合条件 也就是有写者在占用共享资源或者有写锁请求在申请,则将读队列请求数加一(++rwlock->__data.__nr_readers_queued)
表示有读者锁请求要访问资源,然后释放lock锁定 等待其他的已经占用的线程访问完成(其实就是等待写者释放占用)。
4.写者访问资源 完成后,在pthread_rwlock_unlock中判断如果没有读者占用了,唤醒第3步的等待
5.重新再lock锁定,将读者锁定请求数减一,重新回到2处判断。
解锁执行流程:
1.判断如果有写者标识 则将其置0 rwlock->__data.__writer =0
2.否则将读者拥有的减1 rwlock->__data.__nr_readers--
3.判断如果没有读者占用资源了,则判断是否有写锁请求,如果有优先唤醒退出 否则唤醒读锁请求
try相关的函数和上面的类似就不分析了。
四、基于读写锁的无锁的类似实现的一种模型
以前写了一个类似的读写锁方式的一个无锁方式的 windows上的实现
读方式:
1.判断写标志是否可用,可用表示不能读 不可用表示能读
2.如果不可用,则引用计数加1(原子操作InterlockedIncrement方式),表示占用资源了。
3.访问共享资源 共享资源的数据层面加校验 校验不对 读错了 丢弃。
4.引用技术减1(原子操作)
写方式:
1.置写标志可用
2.超时时间内等待引用计数是否为0 (超时时间尽量长一点)
3.写共享资源
4.置引用计数为0
5.置写标志不可用
五、总结
用基本层面的lock锁加一些标志 就实现了新的一种控制方式。根本点是让争抢资源人为的干预控制,而不是无头苍蝇那种你争我夺。根据业务需求还可以调整用try无阻塞去做些别的事情然后回来再判断是否能访问共享资源.
pthread中的无论读写都用同一lock锁定是否稍微影响了效率呢?根据具体的功能可以稍微变通一下。