现在的位置: 首页 > 自动控制 > 工业·编程 > 正文

读写锁原理和pthread中的实现分析

2016-07-19 22:58 工业·编程 ⁄ 共 6745字 ⁄ 字号 暂无评论

读写锁是把共享资源的访问分为读者和写者。读者能共享访问,写者能进行写操作.很明显这个比简单的互斥访问要好很多。提高性能效率我的理解是在设计上遵循以下原则

1.锁的深度 范围

2.多线程对共享资源访问的频率 次数

3.让访问尽量可控。

读写锁我觉得就是让这种访问可控.

一、读写锁的特征

  1.当读写锁是写加锁状态时, 在这个锁被解锁之前, 所有试图对这个锁加锁的线程都会被阻塞.

  2.当读写锁在读加锁状态时, 所有试图以读模式对它进行加锁的线程都可以得到访问权, 但是如果线程希望以写模式对此锁进行加锁, 它必须直到所有的线程释放锁.

  3.通常, 当读写锁处于读模式锁住状态时, 如果有另外线程试图以写模式加锁, 读写锁通常会阻塞随后的读模式锁请求, 这样可以避免读模式锁长期占用, 而等待的写模式锁请求长期阻塞.

   要满足以上3点,那pthread中是如何实现的呢?有哪些借鉴意义呢?是不是可以扩展一下呢? 在windows上又如何实现呢?

二、读写锁的应用范围场景

   适合于多个线程读,一个线程写。

三、pthread中的具体实现:

   pthread中的具体实现包括 pthread_rwlock_init.c、pthread_rwlock_destroy.c、pthread_rwlock_rdlock.c、pthread_rwlock_wrlock.c、pthread_rwlock_tryrdlock.c、pthread_rwlock_trywrlock.c、pthread_rwlock_unlock.c.

  pthread_rwlock_init.c:

    

int   __pthread_rwlock_init (rwlock, attr)

     pthread_rwlock_t *rwlock;

     const pthread_rwlockattr_t *attr;

{

  const struct pthread_rwlockattr *iattr;

  iattr = ((const struct pthread_rwlockattr *) attr) ?: &default_rwlockattr;

  memset (rwlock, '\0', sizeof (*rwlock));

  rwlock->__data.__flags

    = iattr->lockkind == PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP;

  #ifdef __ASSUME_PRIVATE_FUTEX

  rwlock->__data.__shared = (iattr->pshared == PTHREAD_PROCESS_PRIVATE

        ? 0 : FUTEX_PRIVATE_FLAG);

#else

  rwlock->__data.__shared = (iattr->pshared == PTHREAD_PROCESS_PRIVATE

        ? 0

        : THREAD_GETMEM (THREAD_SELF,

           header.private_futex));

#endif

  return 0;

}

这个init就是初始化一下rwlock.

重点是pthread_rwlock_rdlock.c、pthread_rwlock_wrlock.c 和pthread_rwlock_unlock.c的实现

pthread_rwlock_rdlock.c:

int

__pthread_rwlock_rdlock (rwlock)

     pthread_rwlock_t *rwlock;

{

  int result = 0;

  LIBC_PROBE (rdlock_entry, 1, rwlock);

  /* Make sure we are alone.  */

  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);

  while (1)

    {

      /* Get the rwlock if there is no writer...  */

      if (rwlock->__data.__writer == 0

   /* ...and if either no writer is waiting or we prefer readers.  */

   && (!rwlock->__data.__nr_writers_queued

       || PTHREAD_RWLOCK_PREFER_READER_P (rwlock)))

{

   /* Increment the reader counter.  Avoid overflow.  */

   if (__builtin_expect (++rwlock->__data.__nr_readers == 0, 0))

     {

       /* Overflow on number of readers.  */

       --rwlock->__data.__nr_readers;

       result = EAGAIN;

     }

   else

     LIBC_PROBE (rdlock_acquire_read, 1, rwlock);

   break;

}

      /* Make sure we are not holding the rwlock as a writer.  This is

  a deadlock situation we recognize and report.  */

      if (__builtin_expect (rwlock->__data.__writer

       == THREAD_GETMEM (THREAD_SELF, tid), 0))

{

   result = EDEADLK;

   break;

}

      /* Remember that we are a reader.  */

      if (__builtin_expect (++rwlock->__data.__nr_readers_queued == 0, 0))

{

   /* Overflow on number of queued readers.  */

   --rwlock->__data.__nr_readers_queued;

   result = EAGAIN;

   break;

}

      int waitval = rwlock->__data.__readers_wakeup;

      /* Free the lock.  */

      lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);

      /* Wait for the writer to finish.  */

      lll_futex_wait (&rwlock->__data.__readers_wakeup, waitval,

        rwlock->__data.__shared);

      /* Get the lock.  */

      lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);

      --rwlock->__data.__nr_readers_queued;

    }

  /* We are done, free the lock.  */

  lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);

  return result;

}

pthread_rwlock_wrlock.c:

int

__pthread_rwlock_wrlock (rwlock)

     pthread_rwlock_t *rwlock;

{

  int result = 0;

  LIBC_PROBE (wrlock_entry, 1, rwlock);

  /* Make sure we are alone.  */

  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);

  while (1)

    {

      /* Get the rwlock if there is no writer and no reader.  */

      if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0)

{

   /* Mark self as writer.  */

   rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid);

   LIBC_PROBE (wrlock_acquire_write, 1, rwlock);

   break;

}

      /* Make sure we are not holding the rwlock as a writer.  This is

  a deadlock situation we recognize and report.  */

      if (__builtin_expect (rwlock->__data.__writer

       == THREAD_GETMEM (THREAD_SELF, tid), 0))

{

   result = EDEADLK;

   break;

}

      /* Remember that we are a writer.  */

      if (++rwlock->__data.__nr_writers_queued == 0)

{

   /* Overflow on number of queued writers.  */

   --rwlock->__data.__nr_writers_queued;

   result = EAGAIN;

   break;

}

      int waitval = rwlock->__data.__writer_wakeup;

      /* Free the lock.  */

      lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);

      /* Wait for the writer or reader(s) to finish.  */

      lll_futex_wait (&rwlock->__data.__writer_wakeup, waitval,

        rwlock->__data.__shared);

      /* Get the lock.  */

      lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);

      /* To start over again, remove the thread from the writer list.  */

      --rwlock->__data.__nr_writers_queued;

    }

  /* We are done, free the lock.  */

  lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);

  return result;

}

pthread_rwlock_unlock.c:

int

__pthread_rwlock_unlock (pthread_rwlock_t *rwlock)

{

  LIBC_PROBE (rwlock_unlock, 1, rwlock);

  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);

  if (rwlock->__data.__writer)

    rwlock->__data.__writer = 0;

  else

    --rwlock->__data.__nr_readers;

  if (rwlock->__data.__nr_readers == 0)

    {

      if (rwlock->__data.__nr_writers_queued)

{

   ++rwlock->__data.__writer_wakeup;

   lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);

   lll_futex_wake (&rwlock->__data.__writer_wakeup, 1,

     rwlock->__data.__shared);

   return 0;

}

      else if (rwlock->__data.__nr_readers_queued)

{

   ++rwlock->__data.__readers_wakeup;

   lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);

   lll_futex_wake (&rwlock->__data.__readers_wakeup, INT_MAX,

     rwlock->__data.__shared);

   return 0;

}

    }

  lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);

  return 0;

}

整理一下实现的思路

几个标志控制变量的说明

rwlock->__data.__writer:非0表示当前被写者占用

rwlock->__data.__nr_readers:读者共享占用资源的读者数量

rwlock->__data.__nr_writers_queued:写者的锁请求数

rwlock->__data.__nr_readers_queued:读者的锁请求数

写锁执行流程:

1.lock锁定 (实现对一些公共的控制标志的顺序访问)

2.判断如果没有写者也没有读者在访问共享资源

   则将写者rwlock->__data.__writer 标记成自己.非零值 退出循环的自旋方式 lock解锁

3.   如果有别的在访问共享资源 则将写队列请求数加一(++rwlock->__data.__nr_writers_queued)

        表示有写请求要访问资源,然后释放lock锁定 等待其他的已经占用的线程访问完成。

4.其他的访问资源 完成后,在pthread_rwlock_unlock中判断如果没有读者占用了,唤醒第3步的等待

5.重新再lock锁定,将写者锁定请求数减一,重新回到2处判断。这时因为其他的读者锁请求没有唤醒 所以优先第2步就执行了也就是满足了读写锁的特征3.

读锁执行流程:

1.lock锁定 (实现对一些公共的控制标志的顺序访问)

2.判断如果没有写者且没有写者锁请求

   则将读者++rwlock->__data.__nr_readers 退出循环的自旋方式 lock解锁

3.   如果不符合条件 也就是有写者在占用共享资源或者有写锁请求在申请,则将读队列请求数加一(++rwlock->__data.__nr_readers_queued)

        表示有读者锁请求要访问资源,然后释放lock锁定 等待其他的已经占用的线程访问完成(其实就是等待写者释放占用)。

4.写者访问资源 完成后,在pthread_rwlock_unlock中判断如果没有读者占用了,唤醒第3步的等待

5.重新再lock锁定,将读者锁定请求数减一,重新回到2处判断。

解锁执行流程:

1.判断如果有写者标识 则将其置0 rwlock->__data.__writer =0

2.否则将读者拥有的减1  rwlock->__data.__nr_readers--

3.判断如果没有读者占用资源了,则判断是否有写锁请求,如果有优先唤醒退出 否则唤醒读锁请求

try相关的函数和上面的类似就不分析了。

四、基于读写锁的无锁的类似实现的一种模型

   以前写了一个类似的读写锁方式的一个无锁方式的 windows上的实现

   读方式:

   1.判断写标志是否可用,可用表示不能读 不可用表示能读

   2.如果不可用,则引用计数加1(原子操作InterlockedIncrement方式),表示占用资源了。

  3.访问共享资源 共享资源的数据层面加校验 校验不对 读错了 丢弃。

  4.引用技术减1(原子操作)

  写方式:

  1.置写标志可用

  2.超时时间内等待引用计数是否为0 (超时时间尽量长一点)

  3.写共享资源

  4.置引用计数为0

  5.置写标志不可用

五、总结

    用基本层面的lock锁加一些标志 就实现了新的一种控制方式。根本点是让争抢资源人为的干预控制,而不是无头苍蝇那种你争我夺。根据业务需求还可以调整用try无阻塞去做些别的事情然后回来再判断是否能访问共享资源.

   pthread中的无论读写都用同一lock锁定是否稍微影响了效率呢?根据具体的功能可以稍微变通一下。

给我留言

留言无头像?