Синхронизация потоков

Работая параллельно, потоки совместно используют адресное пространство процесса. Также все они имеют доступ к описателям (handles) открытых в процессе объектов. А что делать, если несколько потоков одновременно обращаются к одному ресурсу или необходимо как-то упорядочить работу потоков? Для этого используют объекты синхронизации и соответствующие механизмы.

Мьютексы.

Мьютексы (Mutex) это объекты ядра, которые создаются функцией CreateMutex(). Мьютекс бывает в двух состояниях - занятом и свободном. Мьютексом хорошо защищать единичный ресурс от одновременного обращения к нему разными потоками.

Пример 3. Допустим, в программе используется ресурс, например, файл или буфер в памяти. Функция WriteToBuffer() вызывается из разных потоков. Чтобы избежать коллизий при одновременном обращении к буферу из разных потоков, используем мьютекс. Прежде чем обратиться к буферу, ожидаем <освобождения> мютекса.
  HANDLE hMutex;   int main() { hMutex = CreateMutex( NULL, FALSE, NULL); // Создаем мьютекс в свободном состоянии ... // Создание потоков, и т.д. ... } BOOL WriteToBuffer() { DWORD dwWaitResult; // Ждем освобождения мьютекса перед тем как обратиться к буферу. dwWaitResult = WaitForSingleObject( hMutex, 5000L); // 5 секунд на таймаут   if (dwWaitResult == WAIT_TIMEOUT) // Таймаут. Мьютекс за это время не освободился. { return FALSE; } else// Мьютекс освободился, и наш поток его занял. Можно работать. { Write_to_the_buffer(). ... ReleaseMutex(hMutex); // Освобождаем мьютекс. } return TRUE; }

Семафоры.

Семафор (Semaphore) создается функцией CreateSemaphore(). Он очень похож на мьютекс, только в отличие от него у семафора есть счетчик. Семафор открыт если счетчик больше 0 и закрыт, если счетчик равен 0. Семафором обычно "огораживают" наборы равнозначных ресурсов (элементов), например очередь, список и т.п.

Пример 4. Классический пример использования семафора это очередь элементов, которую обрабатывают несколько потоков. Потоки "разбирают" элементы из очереди. Если очередь пуста, потоки должны "спать", ожидая появления новых элементов. Для учета элементов в очереди используется семафор.
  class CMyQueue { HANDLE m_hSemaphore; // Семафор для учета элементов очереди // Описание других объектов для хранения элементов очереди   public: CMyQueue() { m_hSemaphore = CreateSemaphore(NULL, 0, 1000, NULL); //начальное значение счетчика = 0 //максимальное значение = 1000 // Инициализация других объектов } ~CMyQueue() { CloseHandle( m_hSemaphore); // Удаление других объектов } void AddItem(void * NewItem) { // Добавляем элемент в очередь // Увеличиваем счетчик семафора на 1. ReleaseSemaphore(m_hSemaphore,1, NULL); } void GetItem(void * Item) { // Если очередь пуста, то потоки, вызвавшие этот метод, // будут находиться в ожидании... WaitForSingleObject(m_hSemaphore,INFINITE);   // Удаляем элемент из очереди } };


Замечание. В этом примере мы считаем, что сами процедуры добавления элемента в очередь и удаления из очереди безопасны с точки зрения многопоточности. Не будем пока касаться деталей их реализации. Подробнее мы рассмотрим это в примере 9.

События.

События (Event), также как и мьютексы имеют два состояния - установленное и сброшенное. События бывают со сбросом вручную и с автосбросом. Когда поток дождался (wait-функция вернула управление) события с автосбросом, такое событие автоматически сбрасывается. В противном случае событие нужно сбрасывать вручную, вызвав функцию ResetEvent(). Допустим, сразу несколько потоков ожидают одного и того же события, и событие сработало. Если это было событие с автосбросом, то оно позволит работать только одному потоку (ведь сразу же после возврата из его wait-функции событие сбросится автоматически!), а остальные потоки останутся ждать. Если же это было событие со сбросом вручную, то все потоки получат управление, а событие так и останется в установленном состоянии, пока какой-нибудь поток не вызовет ResetEvent().

Пример 5. Вот еще один пример многопоточного приложения. Программа имеет два потока; один готовит данные, а второй отсылает их на сервер. Разумно распараллелить их работу. Здесь потоки должны работать по очереди. Сначала первый поток готовит порцию данных. Потом второй поток отправляет ее, а первый тем временем готовит следующую порцию и т.д. Для такой синхронизации понадобится два event'а с автосбросом.
  unsigned __stdcall CaptureThreadFunc( void * arg) // Поток, готовящий данные { while (bSomeCondition) { WaitForSingleObject(m_hEventForCaptureTh,INFINITE); // Ждем своего события ... // Готовим данные SetEvent(hEventForTransmitTh); // Разрешаем работать второму потоку } _endthreadex( 0 ); return 0; };   unsigned __stdcall TransmitThreadFunc( void * arg) // Поток, отсылающий данные. { while (bSomeCondition) { WaitForSingleObject(m_hEventForTransmitTh,INFINITE); // Ждем своего события ... // Данные готовы, формируем из них пакет для отправки SetEvent(hEventForCaptureTh); // Разрешаем работать первому потоку, а сами... ... // отправляем пакет } _endthreadex( 0 ); return 0; };   int main(int argc, char* argv[]) // Основной поток { // Создаем два события с автосбросом, со сброшенным начальным состоянием hEventForCaptureTh = CreateEvent(NULL,FALSE,FALSE,NULL); hEventForTransmitTh = CreateEvent(NULL,FALSE,FALSE,NULL);   // Создаем потоки hCaptureTh = (HANDLE)_beginthreadex( NULL, 0, &CaptureThreadFunc, 0, 0,&uTh1); hTransmitTh = (HANDLE)_beginthreadex( NULL, 0, &TransmitThreadFunc, 0, 0,&uTh2); // Запускаем первый поток SetEvent(hEventForCaptureTh);   .... }
Пример 6. Другой пример. Программа непрерывно в цикле производит какие-то вычисления. Нужно иметь возможность приостановить на время ее работу. Допустим, это просмотрщик видео файлов, который в цикле, кадр за кадром отображает информацию на экран. Не будем вдаваться в подробности видео функций. Реализуем функции Pause и Play для программы. Используем событие со сбросом вручную.
  // Главная функция потока, которая в цикле отображает кадры unsigned __stdcall VideoThreadFunc( void * arg) { while (bSomeCondition) { WaitForSingleObject(m_hPauseEvent,INFINITE); // Если событие сброшено, ждем ... // Отображаем очередной кадр на экран } _endthreadex( 0 ); return 0; };   void Play() { SetEvent(m_hPauseEvent); };   void Pause() { ResetEvent(m_hPauseEvent); };

Функция PulseEvent() устанавливает событие и тут же переводит его обратно в сброшенное состояние; ее вызов равнозначен последовательному вызову SetEvent() и ResetEvent(). Если PulseEvent вызывается для события со сбросом в ручную, то все потоки, ожидающие этот объект, получают управление. При вызове PulseEvent для события с автосбросом пробуждается только один из ждущих потоков. А если ни один из потоков не ждет объект-событие, вызов функции не дает никакого эффекта.

Пример 7. Реализуем функцию NextFrame() для предыдущего примера для промотки файла вручную по кадрам.
  void NextFrame() { PulseEvent(m_hPauseEvent); };

Ожидаемые таймеры

Пожалуй, ожидаемые таймеры - самый изощренный объект ядра для синхронизации. Появились они, начиная с Windows 98. Таймеры создаются функцией CreateWaitableTimer и бывают, также как и события, с автосбросом и без него. Затем таймер надо настроить функцией SetWaitableTimer. Таймер переходит в сигнальное состояние, когда истекает его таймаут. Отменить "тиканье" таймера можно функцией CancelWaitableTimer. Примечательно, что можно указать callback функцию при установке таймера. Она будет выполняться, когда срабатывает таймер.

Пример 8. Напишем программу-будильник используя WaitableTimer'ы. Будильник будет срабатыват раз в день в 8 утра и "пикать" 10 раз. Используем для этого два таймера, один из которых с callback-функцией.
  #include<process.h> #include<windows.h> #include<stdio.h> #include<conio.h>   #define HOUR (8) // время, когда срабатывает будильник (только часы) #define RINGS (10) // сколько раз пикать   HANDLE hTerminateEvent ;   // callback функция таймера VOID CALLBACK TimerAPCProc(LPVOID, DWORD, DWORD) { Beep(1000,500); // звоним! };   // функция потока unsigned __stdcall ThreadFunc(void *) { HANDLE hDayTimer = CreateWaitableTimer(NULL,FALSE,NULL); HANDLE hAlarmTimer = CreateWaitableTimer(NULL,FALSE,NULL); HANDLE h[2]; // мы будем ждать эти объекты h[0] = hTerminateEvent; h[1] = hDayTimer; int iRingCount=0; // число "звонков" int iFlag; DWORD dw;   // немного помучаемся со временем, //т.к. таймер принимает его только в формате FILETIME LARGE_INTEGER liDueTime, liAllDay; liDueTime.QuadPart=0; // сутки в 100-наносекундных интервалах = 10000000 * 60 * 60 * 24 = 0xC92A69C000 liAllDay.QuadPart = 0xC9; liAllDay.QuadPart=liAllDay.QuadPart << 32; liAllDay.QuadPart |= 0x2A69C000; SYSTEMTIME st; GetLocalTime(&st); // узнаем текущую дату/время iFlag = st.wHour > HOUR; // если назначенный час еще не наступил, //то ставим будильник на сегодня, иначе - на завтра st.wHour = HOUR; st.wMinute = 0; st.wSecond =0; FILETIME ft; SystemTimeToFileTime( &st, &ft); if (iFlag) ((LARGE_INTEGER *)&ft)->QuadPart = ((LARGE_INTEGER *)&ft)->QuadPart +liAllDay.QuadPart ;   LocalFileTimeToFileTime(&ft,&ft); // Устанавливаем таймер, // он будет срабатывать раз в сутки (24*60*60*1000ms), // начиная со следующего "часа пик" - HOUR SetWaitableTimer(hDayTimer, (LARGE_INTEGER *) &ft, 24*60*60000, 0, 0, 0); do { dw = WaitForMultipleObjectsEx(2,h,FALSE,INFINITE,TRUE); if (dw == WAIT_OBJECT_0 +1) // сработал hDayTimer { // Устанавливаем таймер, он будет вызывать callback ф-ию раз в секунду, // начнет с текущего момента SetWaitableTimer(hAlarmTimer, &liDueTime, 1000, TimerAPCProc, NULL, 0); iRingCount=0; } if (dw == WAIT_IO_COMPLETION) // закончила работать callback ф-ия { iRingCount++; if (iRingCount==RINGS) CancelWaitableTimer(hAlarmTimer); } }while (dw!= WAIT_OBJECT_0); // пока не сработало hTerminateEvent крутимся в цикле   // закрывае handles, выходим CancelWaitableTimer(hDayTimer); CancelWaitableTimer(hAlarmTimer); CloseHandle(hDayTimer); CloseHandle(hAlarmTimer); _endthreadex( 0 ); return 0; };   int main(int argc, char* argv[]) { // это событие показывае потоку когда надо завершаться hTerminateEvent = CreateEvent(NULL,FALSE,FALSE,NULL); unsigned uThreadID; HANDLE hThread; // создаем поток hThread = (HANDLE)_beginthreadex( NULL, 0, &ThreadFunc, 0, 0,&uThreadID); puts("Press any key to exit."); // ждем any key от пользователя для завершения программы getch(); // выставляем событие SetEvent(hTerminateEvent); // ждем завершения потока WaitForSingleObject(hThread, INFINITE ); // закрываем handle CloseHandle( hThread ); return 0; }

Критические секции. Синхронизация в пользовательском режиме.

Критическая секция гарантирует вам, что куски кода программы, огороженные ей, не будут выполняться одновременно. Строго говоря, критическая секция не является объектом ядра. Она представляет собой структуру, содержащую несколько флагов и какой-то (не важно) объект ядра. При входе в критическую секцию сначала проверяются флаги, и если выясняется, что она уже занята другим потоком, то выполняется обычная wait-функция. Критическая секция примечательна тем, что для проверки, занята она или нет, программа не переходит в режим ядра (не выполняется wait-функция) а лишь проверяются флаги. Из-за этого считается, что синхронизация с помощью критических секций наиболее быстрая. Такую синхронизацию называют "синхронизация в пользовательском режиме".

Пример 9. Снова рассмотрим очередь элементов. Один из вариантов ее реализации - двусвязный список. С точки зрения многопоточности, опасными являются операции добавления и удаления элементов из очереди. Существует вероятность, что несколько потоков одновременно начнут перестраивать указатели и связность очереди нарушится. Чтобы этого избежать, используем критическую секцию.