摘要:前言對于多進(jìn)程多線程的應(yīng)用程序來說,保證數(shù)據(jù)正確的同步與更新離不開鎖和信號,中的鎖與信號基本采用系列函數(shù)實(shí)現(xiàn)。中的鎖類型有很多種互斥鎖自旋鎖文件鎖讀寫鎖原子鎖,本節(jié)就會講解中各種鎖的定義與使用。
前言
對于多進(jìn)程多線程的應(yīng)用程序來說,保證數(shù)據(jù)正確的同步與更新離不開鎖和信號,swoole 中的鎖與信號基本采用 pthread 系列函數(shù)實(shí)現(xiàn)。UNIX 中的鎖類型有很多種:互斥鎖、自旋鎖、文件鎖、讀寫鎖、原子鎖,本節(jié)就會講解 swoole 中各種鎖的定義與使用。
APUE 學(xué)習(xí)筆記——線程與鎖
APUE 學(xué)習(xí)筆記——高級 IO與文件鎖
數(shù)據(jù)結(jié)構(gòu)swoole 中無論哪種鎖,其數(shù)據(jù)結(jié)構(gòu)都是 swLock,這個數(shù)據(jù)結(jié)構(gòu)內(nèi)部有一個聯(lián)合體 object,這個聯(lián)合體可以是 互斥鎖、自旋鎖、文件鎖、讀寫鎖、原子鎖,type 可以指代這個鎖的類型,具體可選項(xiàng)是 SW_LOCKS 這個枚舉類型
該結(jié)構(gòu)體還定義了幾個函數(shù)指針,這幾個函數(shù)類似于各個鎖需要實(shí)現(xiàn)的接口,值得注意的是 lock_rd 和 trylock_rd兩個函數(shù)是專門為了 swFileLock 和 swRWLock 設(shè)計(jì)的,其他鎖沒有這兩個函數(shù)。
typedef struct _swLock { int type; union { swMutex mutex; #ifdef HAVE_RWLOCK swRWLock rwlock; #endif #ifdef HAVE_SPINLOCK swSpinLock spinlock; #endif swFileLock filelock; swSem sem; swAtomicLock atomlock; } object; int (*lock_rd)(struct _swLock *); int (*lock)(struct _swLock *); int (*unlock)(struct _swLock *); int (*trylock_rd)(struct _swLock *); int (*trylock)(struct _swLock *); int (*free)(struct _swLock *); } swLock; enum SW_LOCKS { SW_RWLOCK = 1, #define SW_RWLOCK SW_RWLOCK SW_FILELOCK = 2, #define SW_FILELOCK SW_FILELOCK SW_MUTEX = 3, #define SW_MUTEX SW_MUTEX SW_SEM = 4, #define SW_SEM SW_SEM SW_SPINLOCK = 5, #define SW_SPINLOCK SW_SPINLOCK SW_ATOMLOCK = 6, #define SW_ATOMLOCK SW_ATOMLOCK };互斥鎖
互斥鎖是最常用的進(jìn)程/線程鎖,swMutex 的基礎(chǔ)是 pthread_mutex 系列函數(shù), 因此該數(shù)據(jù)結(jié)構(gòu)只有兩個成員變量:_lock、attr:
typedef struct _swMutex { pthread_mutex_t _lock; pthread_mutexattr_t attr; } swMutex;互斥鎖的創(chuàng)建
互斥鎖的創(chuàng)建就是 pthread_mutex 互斥鎖的初始化,首先初始化互斥鎖的屬性 pthread_mutexattr_t attr,設(shè)定互斥鎖是否要進(jìn)程共享,之后設(shè)置各個關(guān)于鎖的函數(shù):
int swMutex_create(swLock *lock, int use_in_process) { int ret; bzero(lock, sizeof(swLock)); lock->type = SW_MUTEX; pthread_mutexattr_init(&lock->object.mutex.attr); if (use_in_process == 1) { pthread_mutexattr_setpshared(&lock->object.mutex.attr, PTHREAD_PROCESS_SHARED); } if ((ret = pthread_mutex_init(&lock->object.mutex._lock, &lock->object.mutex.attr)) < 0) { return SW_ERR; } lock->lock = swMutex_lock; lock->unlock = swMutex_unlock; lock->trylock = swMutex_trylock; lock->free = swMutex_free; return SW_OK; }互斥鎖函數(shù)
互斥鎖的函數(shù)就是調(diào)用相應(yīng)的 pthread_mutex 系列函數(shù):
static int swMutex_lock(swLock *lock) { return pthread_mutex_lock(&lock->object.mutex._lock); } static int swMutex_unlock(swLock *lock) { return pthread_mutex_unlock(&lock->object.mutex._lock); } static int swMutex_trylock(swLock *lock) { return pthread_mutex_trylock(&lock->object.mutex._lock); } static int swMutex_free(swLock *lock) { pthread_mutexattr_destroy(&lock->object.mutex.attr); return pthread_mutex_destroy(&lock->object.mutex._lock); } int swMutex_lockwait(swLock *lock, int timeout_msec) { struct timespec timeo; timeo.tv_sec = timeout_msec / 1000; timeo.tv_nsec = (timeout_msec - timeo.tv_sec * 1000) * 1000 * 1000; return pthread_mutex_timedlock(&lock->object.mutex._lock, &timeo); }讀寫鎖
對于讀多寫少的情況,讀寫鎖可以顯著的提高程序效率,swRWLock 的基礎(chǔ)是 pthread_rwlock 系列函數(shù):
typedef struct _swRWLock { pthread_rwlock_t _lock; pthread_rwlockattr_t attr; } swRWLock;讀寫鎖的創(chuàng)建
讀寫鎖的創(chuàng)建過程和互斥鎖類似:
int swRWLock_create(swLock *lock, int use_in_process) { int ret; bzero(lock, sizeof(swLock)); lock->type = SW_RWLOCK; pthread_rwlockattr_init(&lock->object.rwlock.attr); if (use_in_process == 1) { pthread_rwlockattr_setpshared(&lock->object.rwlock.attr, PTHREAD_PROCESS_SHARED); } if ((ret = pthread_rwlock_init(&lock->object.rwlock._lock, &lock->object.rwlock.attr)) < 0) { return SW_ERR; } lock->lock_rd = swRWLock_lock_rd; lock->lock = swRWLock_lock_rw; lock->unlock = swRWLock_unlock; lock->trylock = swRWLock_trylock_rw; lock->trylock_rd = swRWLock_trylock_rd; lock->free = swRWLock_free; return SW_OK; }讀寫鎖函數(shù)
static int swRWLock_lock_rd(swLock *lock) { return pthread_rwlock_rdlock(&lock->object.rwlock._lock); } static int swRWLock_lock_rw(swLock *lock) { return pthread_rwlock_wrlock(&lock->object.rwlock._lock); } static int swRWLock_unlock(swLock *lock) { return pthread_rwlock_unlock(&lock->object.rwlock._lock); } static int swRWLock_trylock_rd(swLock *lock) { return pthread_rwlock_tryrdlock(&lock->object.rwlock._lock); } static int swRWLock_trylock_rw(swLock *lock) { return pthread_rwlock_trywrlock(&lock->object.rwlock._lock); } static int swRWLock_free(swLock *lock) { return pthread_rwlock_destroy(&lock->object.rwlock._lock); }文件鎖
文件鎖是對多進(jìn)程、多線程同一時間寫相同文件這一場景設(shè)定的鎖,底層函數(shù)是 fcntl:
typedef struct _swFileLock { struct flock lock_t; int fd; } swFileLock;文件鎖的創(chuàng)建
int swFileLock_create(swLock *lock, int fd) { bzero(lock, sizeof(swLock)); lock->type = SW_FILELOCK; lock->object.filelock.fd = fd; lock->lock_rd = swFileLock_lock_rd; lock->lock = swFileLock_lock_rw; lock->trylock_rd = swFileLock_trylock_rd; lock->trylock = swFileLock_trylock_rw; lock->unlock = swFileLock_unlock; lock->free = swFileLock_free; return 0; }文件鎖函數(shù)
static int swFileLock_lock_rd(swLock *lock) { lock->object.filelock.lock_t.l_type = F_RDLCK; return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock); } static int swFileLock_lock_rw(swLock *lock) { lock->object.filelock.lock_t.l_type = F_WRLCK; return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock); } static int swFileLock_unlock(swLock *lock) { lock->object.filelock.lock_t.l_type = F_UNLCK; return fcntl(lock->object.filelock.fd, F_SETLKW, &lock->object.filelock); } static int swFileLock_trylock_rw(swLock *lock) { lock->object.filelock.lock_t.l_type = F_WRLCK; return fcntl(lock->object.filelock.fd, F_SETLK, &lock->object.filelock); } static int swFileLock_trylock_rd(swLock *lock) { lock->object.filelock.lock_t.l_type = F_RDLCK; return fcntl(lock->object.filelock.fd, F_SETLK, &lock->object.filelock); } static int swFileLock_free(swLock *lock) { return close(lock->object.filelock.fd); }自旋鎖
自旋鎖類似于互斥鎖,不同的是自旋鎖在加鎖失敗的時候,并不會沉入內(nèi)核,而是空轉(zhuǎn),這樣的鎖效率更高,但是會空耗 CPU
資源:
typedef struct _swSpinLock { pthread_spinlock_t lock_t; } swSpinLock;自旋鎖的創(chuàng)建
int swSpinLock_create(swLock *lock, int use_in_process) { int ret; bzero(lock, sizeof(swLock)); lock->type = SW_SPINLOCK; if ((ret = pthread_spin_init(&lock->object.spinlock.lock_t, use_in_process)) < 0) { return -1; } lock->lock = swSpinLock_lock; lock->unlock = swSpinLock_unlock; lock->trylock = swSpinLock_trylock; lock->free = swSpinLock_free; return 0; }自旋鎖函數(shù)
static int swSpinLock_lock(swLock *lock) { return pthread_spin_lock(&lock->object.spinlock.lock_t); } static int swSpinLock_unlock(swLock *lock) { return pthread_spin_unlock(&lock->object.spinlock.lock_t); } static int swSpinLock_trylock(swLock *lock) { return pthread_spin_trylock(&lock->object.spinlock.lock_t); } static int swSpinLock_free(swLock *lock) { return pthread_spin_destroy(&lock->object.spinlock.lock_t); }原子鎖
不同于以上幾種鎖,swoole 的原子鎖并不是 pthread 系列的鎖,而是自定義實(shí)現(xiàn)的。
typedef volatile uint32_t sw_atomic_uint32_t; typedef sw_atomic_uint32_t sw_atomic_t; typedef struct _swAtomicLock { sw_atomic_t lock_t; uint32_t spin; } swAtomicLock;原子鎖的創(chuàng)建
int swAtomicLock_create(swLock *lock, int spin) { bzero(lock, sizeof(swLock)); lock->type = SW_ATOMLOCK; lock->object.atomlock.spin = spin; lock->lock = swAtomicLock_lock; lock->unlock = swAtomicLock_unlock; lock->trylock = swAtomicLock_trylock; return SW_OK; }原子鎖的加鎖
static int swAtomicLock_lock(swLock *lock) { sw_spinlock(&lock->object.atomlock.lock_t); return SW_OK; }
原子鎖的加鎖邏輯函數(shù) sw_spinlock 非常復(fù)雜,具體步驟如下:
如果原子鎖沒有被鎖,那么調(diào)用原子函數(shù) sw_atomic_cmp_set(__sync_bool_compare_and_swap ) 進(jìn)行加鎖
若原子鎖已經(jīng)被加鎖,如果是單核,那么就調(diào)用 sched_yield 函數(shù)讓出執(zhí)行權(quán),因?yàn)檫@說明自旋鎖已經(jīng)被其他進(jìn)程加鎖,但是卻被強(qiáng)占睡眠,我們需要讓出控制權(quán)讓那個唯一的 cpu 把那個進(jìn)程跑下去,注意這時絕對不能進(jìn)行自選,否則就是死鎖。
如果是多核,就要不斷空轉(zhuǎn)的嘗試加鎖,防止睡眠,加鎖的嘗試間隔時間會指數(shù)增加,例如第一次 1 個時鐘周期,第二次 2 時鐘周期,第三次 4 時鐘周期...
間隔時間內(nèi)執(zhí)行的函數(shù) sw_atomic_cpu_pause 使用的是內(nèi)嵌的匯編代碼,目的在讓 cpu 空轉(zhuǎn),禁止線程或進(jìn)程被其他線程強(qiáng)占導(dǎo)致睡眠,恢復(fù)上下文浪費(fèi)時間。
如果超過了 SW_SPINLOCK_LOOP_N 次數(shù),還沒有能夠獲取的到鎖,那么也要讓出控制權(quán),這時很有可能被鎖保護(hù)的代碼有阻塞行為
#define sw_atomic_cmp_set(lock, old, set) __sync_bool_compare_and_swap(lock, old, set) #define sw_atomic_cpu_pause() __asm__ __volatile__ ("pause") #define swYield() sched_yield() //or usleep(1) static sw_inline void sw_spinlock(sw_atomic_t *lock) { uint32_t i, n; while (1) { if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1)) { return; } if (SW_CPU_NUM > 1) { for (n = 1; n < SW_SPINLOCK_LOOP_N; n <<= 1) { for (i = 0; i < n; i++) { sw_atomic_cpu_pause(); } if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1)) { return; } } } swYield(); } }原子鎖的函數(shù)
static int swAtomicLock_unlock(swLock *lock) { return lock->object.atomlock.lock_t = 0; } static int swAtomicLock_trylock(swLock *lock) { sw_atomic_t *atomic = &lock->object.atomlock.lock_t; return (*(atomic) == 0 && sw_atomic_cmp_set(atomic, 0, 1)); }信號量
信號量也是數(shù)據(jù)同步的一種重要方式,其數(shù)據(jù)結(jié)構(gòu)為:
typedef struct _swSem { key_t key; int semid; } swSem;信號量的創(chuàng)建
信號量的初始化首先需要調(diào)用 semget 創(chuàng)建一個新的信號量
semctl 會將信號量初始化為 0
int swSem_create(swLock *lock, key_t key) { int ret; lock->type = SW_SEM; if ((ret = semget(key, 1, IPC_CREAT | 0666)) < 0) { return SW_ERR; } if (semctl(ret, 0, SETVAL, 1) == -1) { swWarn("semctl(SETVAL) failed"); return SW_ERR; } lock->object.sem.semid = ret; lock->lock = swSem_lock; lock->unlock = swSem_unlock; lock->free = swSem_free; return SW_OK; }信號量的 V 操作
static int swSem_unlock(swLock *lock) { struct sembuf sem; sem.sem_flg = SEM_UNDO; sem.sem_num = 0; sem.sem_op = 1; return semop(lock->object.sem.semid, &sem, 1); }信號量的 P 操作
static int swSem_lock(swLock *lock) { struct sembuf sem; sem.sem_flg = SEM_UNDO; sem.sem_num = 0; sem.sem_op = -1; return semop(lock->object.sem.semid, &sem, 1); }信號量的銷毀
IPC_RMID 用于銷毀信號量
static int swSem_free(swLock *lock) { return semctl(lock->object.sem.semid, 0, IPC_RMID); }條件變量
條件變量并沒有作為 swLock 的一員,而是自成一體
條件變量不僅需要 pthread_cond_t,還需要互斥量 swLock
typedef struct _swCond { swLock _lock; pthread_cond_t _cond; int (*wait)(struct _swCond *object); int (*timewait)(struct _swCond *object, long, long); int (*notify)(struct _swCond *object); int (*broadcast)(struct _swCond *object); void (*free)(struct _swCond *object); int (*lock)(struct _swCond *object); int (*unlock)(struct _swCond *object); } swCond;條件變量的創(chuàng)建
int swCond_create(swCond *cond) { if (pthread_cond_init(&cond->_cond, NULL) < 0) { swWarn("pthread_cond_init fail. Error: %s [%d]", strerror(errno), errno); return SW_ERR; } if (swMutex_create(&cond->_lock, 0) < 0) { return SW_ERR; } cond->notify = swCond_notify; cond->broadcast = swCond_broadcast; cond->timewait = swCond_timewait; cond->wait = swCond_wait; cond->lock = swCond_lock; cond->unlock = swCond_unlock; cond->free = swCond_free; return SW_OK; }條件變量的函數(shù)
值得注意的是,條件變量的函數(shù)使用一定要結(jié)合 swCond_lock、swCond_unlock 等函數(shù)
static int swCond_notify(swCond *cond) { return pthread_cond_signal(&cond->_cond); } static int swCond_broadcast(swCond *cond) { return pthread_cond_broadcast(&cond->_cond); } static int swCond_timewait(swCond *cond, long sec, long nsec) { struct timespec timeo; timeo.tv_sec = sec; timeo.tv_nsec = nsec; return pthread_cond_timedwait(&cond->_cond, &cond->_lock.object.mutex._lock, &timeo); } static int swCond_wait(swCond *cond) { return pthread_cond_wait(&cond->_cond, &cond->_lock.object.mutex._lock); } static int swCond_lock(swCond *cond) { return cond->_lock.lock(&cond->_lock); } static int swCond_unlock(swCond *cond) { return cond->_lock.unlock(&cond->_lock); } static void swCond_free(swCond *cond) { pthread_cond_destroy(&cond->_cond); cond->_lock.free(&cond->_lock); }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/29220.html
摘要:在創(chuàng)建進(jìn)程和線程之間,主線程開始進(jìn)行信號處理函數(shù)的設(shè)置。事件循環(huán)結(jié)束前會調(diào)用函數(shù),該函數(shù)會檢查并執(zhí)行相應(yīng)的信號處理函數(shù)。 前言 信號處理是網(wǎng)絡(luò)庫不可或缺的一部分,不論是 ALARM、SIGTERM、SIGUSR1、SIGUSR2、SIGPIPE 等信號對程序的控制,還是 reactor、read、write 等操作被信號中斷的處理,都關(guān)系著整個框架程序的正常運(yùn)行。 Signal 數(shù)據(jù)...
摘要:修復(fù)添加超過萬個以上定時器時發(fā)生崩潰的問題增加模塊,下高性能序列化庫修復(fù)監(jiān)聽端口設(shè)置無效的問題等。線程來處理網(wǎng)絡(luò)事件輪詢,讀取數(shù)據(jù)。當(dāng)?shù)娜挝帐殖晒α艘院?,由這個線程將連接成功的消息告訴進(jìn)程,再由進(jìn)程轉(zhuǎn)交給進(jìn)程。此時進(jìn)程觸發(fā)事件。 本文示例代碼詳見:https://github.com/52fhy/swoo...。 簡介 Swoole是一個PHP擴(kuò)展,提供了PHP語言的異步多線程服務(wù)器...
摘要:清空主進(jìn)程殘留的定時器與信號。設(shè)定為執(zhí)行回調(diào)函數(shù)如果在回調(diào)函數(shù)中調(diào)用了異步系統(tǒng),啟動函數(shù)進(jìn)行事件循環(huán)。因此為了區(qū)分兩者,規(guī)定并不允許兩者同時存在。 前言 swoole-1.7.2 增加了一個進(jìn)程管理模塊,用來替代 PHP 的 pcntl 擴(kuò)展。 PHP自帶的pcntl,存在很多不足,如 pcntl 沒有提供進(jìn)程間通信的功能 pcntl 不支持重定向標(biāo)準(zhǔn)輸入和輸出 pcntl 只...
摘要:當(dāng)其就緒時,會調(diào)用執(zhí)行定時函數(shù)。進(jìn)程超時停止進(jìn)程將要停止時,并不會立刻停止,而是會等待事件循環(huán)結(jié)束后停止,這時為了防止進(jìn)程不退出,還設(shè)置了的延遲,超過就會停止該進(jìn)程。當(dāng)允許空閑時間小于時,統(tǒng)一每隔檢測空閑連接。 前言 swoole 的 timer 模塊功能有三個:用戶定時任務(wù)、剔除空閑連接、更新 server 時間。timer 模塊的底層有兩種,一種是基于 alarm 信號,一種是基于...
前言 作為一個網(wǎng)絡(luò)框架,最為核心的就是消息的接受與發(fā)送。高效的 reactor 模式一直是眾多網(wǎng)絡(luò)框架的首要選擇,本節(jié)主要講解 swoole 中的 reactor 模塊。 UNP 學(xué)習(xí)筆記——IO 復(fù)用 Reactor 的數(shù)據(jù)結(jié)構(gòu) Reactor 的數(shù)據(jù)結(jié)構(gòu)比較復(fù)雜,首先 object 是具體 Reactor 對象的首地址,ptr 是擁有 Reactor 對象的類的指針, event_nu...
閱讀 3279·2021-09-23 11:55
閱讀 2622·2021-09-13 10:33
閱讀 1674·2019-08-30 15:54
閱讀 3102·2019-08-30 15:54
閱讀 2369·2019-08-30 10:59
閱讀 2378·2019-08-29 17:08
閱讀 1809·2019-08-29 13:16
閱讀 3598·2019-08-26 12:25