Есть много вариаций на тему данного паттерна, но большинство примеров не подходит для многопоточных приложений.
В этой статье я хочу поделится опытом применения паттерна в
многопоточных приложениях и опишу основные проблемы, с которыми мне
приходилось сталкиваться.
Цель данной стати — обратить внимание разработчиков на проблемы, с
которыми можно столкнуться при создании многопоточных приложений.
Выявить подводные камни в реализации коммуникации между компонентами в
многопоточном приложении.
Если Вам необходимо готовое решение, обратите внимание на библиотеку Signals2, котрая включена в boost с
мая 2009-го года.
Я не пытаюсь предоставить решение, которое можно было бы использовать в
готовом виде. Но тем не менее, ознакомившись с материалом, можно
обойтись без использования сторонних библиотек, в тех проектах, в
которых они по каким-либо причинам не доступны или нежелательны
(драйвера, низкоуровневые приложения и т.п.).
Предметная область
Действующие лица
NotificationSender — объект, рассылающий сообщения.
Как правило это рабочий поток, извещающий об изменении своего
состояния, которое необходимо отобразить на пользовательском интерфейсе.
NotificationListener — объект, реализующий обработку уведомлений.
Как правило это объект, который управляет отображением части пользовательского интерфейса связанного с фоновой задачей.
Таких объектов может быть множество, при этом они могут
подключаться/отключаться динамически (к примеру открытие далогового
окна, где показываются детали выполнения задачи)
NotificationDispatcher — объект, управляющий подписчиками и рассылкой сообщений.
Взаимодействие между объектами
Рассылка сообщений всем подписчикам.
Процесс подписки/прекшащения подписки.
Время жизни объектов.
В данной статье описан метод синхронной рассылки сообщений. Это
означает, что вызов функции SendMessage будет синхронным, и поток,
вызывающий этот метод будет ожидать завершения обработки сообщений
всеми подписчиками. В ряде случаев такой подход удобней ассинхронной
рассылки, но при этом в нем есть трудности с прекращением подписки.
Простейшая реализация для однопоточной среды
typedef unsigned __int64 SubscriberId;
class CSubscriber
{
public:
virtual ~CSubscriber(){}
virtual void MessageHandler(void* pContext) = 0;
SubscriberId GetSubscriberId() {return (SubscriberId)this;}
};
class CDispatcher
{
private:
typedef std::vector<CSubscriber*> CSubscriberList;
public:
SubscriberId Subscribe(CSubscriber* pNewSubscriber)
{
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
{
return 0;
}
}
m_SubscriberList.push_back(pNewSubscriber);
return pNewSubscriber->GetSubscriberId();
}
bool Unsubscribe(SubscriberId id)
{
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == id)
{
m_SubscriberList.erase(m_SubscriberList.begin() + i);
return true;
}
}
return false;
}
void SendMessage(void* pContext)
{
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
m_SubscriberList[i]->MessageHandler(pContext);
}
}
private:
CSubscriberList m_SubscriberList;
};
Здесь уникальный идентификатор подписчика — адресс объекта подписчика,
функция GetSubscriberId возвращает всегда одинаковое значение для
одного объекта подписчика в не зависимости от преобразования типов.
Пример использования
class CListener:
public CSubscriber
{
virtual void MessageHandler(void* pContext)
{
wprintf(L"%d\n", *((int*)pContext));
}
};
int _tmain(int argc, _TCHAR* argv[])
{
CDispatcher Dispatcher;
CListener Listener1;
CListener Listener2;
Dispatcher.Subscribe(&Listener1);
Dispatcher.Subscribe(&Listener2);
for(int i = 0; i < 5; ++i)
{
Dispatcher.SendMessage(&i);
}
Dispatcher.Unsubscribe(Listener2.GetSubscriberId());
Dispatcher.Unsubscribe(Listener1.GetSubscriberId());
return 0;
}
Отключение подписчика внутри обработчика сообщений
В примере есть проблема, не связанная с многопоточностью. Эта проблема
проявляется, когда мы пытаемся отписаться внутри обработчика
MessageHandler. Данная проблемма будет решена копированием списка
подписчиков перед вызовом MessageHandler.
Переходим к многопоточной среде
С одним потоком такой код будет работать довольно стабильно.
Давайте посмотрим что будет при работе нескольких потоков.
CDispatcher g_Dispatcher;
DWORD WINAPI WorkingThread(PVOID pParam)
{
for(int i = 0;;++i)
{
g_Dispatcher.SendMessage(&i);
}
};
int _tmain(int argc, _TCHAR* argv[])
{
::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL);
CListener Listener1;
CListener Listener2;
for(;;)
{
g_Dispatcher.Subscribe(&Listener1);
g_Dispatcher.Subscribe(&Listener2);
g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId());
g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId());
}
return 0;
}
Рано или позно произойдет креш.
Проблема заключается в добавлении/удалении подписчиков и одновременной
рассылке уведомлений (многопоточный доступ к
CDispatcher::m_SubscriberList в нашем примере).
Здесь необходима синхронизация доступа к списку подписчиков.
Синхронизация доступа к списку подписчиков
class CDispatcher
{
private:
typedef std::vector<CSubscriber*> CSubscriberList;
public:
SubscriberId Subscribe(CSubscriber* pNewSubscriber)
{
CScopeLocker ScopeLocker(m_Lock);
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
{
return 0;
}
}
m_SubscriberList.push_back(pNewSubscriber);
return pNewSubscriber->GetSubscriberId();
}
bool Unsubscribe(SubscriberId id)
{
CScopeLocker ScopeLocker(m_Lock);
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == id)
{
m_SubscriberList.erase(m_SubscriberList.begin() + i);
return true;
}
}
return false;
}
void SendMessage(void* pContext)
{
CScopeLocker ScopeLocker(m_Lock);
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
m_SubscriberList[i]->MessageHandler(pContext);
}
}
private:
CSubscriberList m_SubscriberList;
CLock m_Lock;
};
Синхронизация доступа была реализована при помощи объектов синхронизации (Critical section или Mutex).
Для большей переносимости и для того, чтобы не отвлекаться от сути
происходящего, абстрагируемся от прямых вызовов платформенно-зависимых
функций, типа EnterCriticalSection. Для этого служит класс CLock.
Для устойчивости к с++ исключениям удобно использовать технологию RAII,
а именно класс CScopeLocker, который в конструкторе захватывает объект
синхронизации, а в деструкторе освобождает его.
При такой реализации программа не будет падать, но нас поджидает еще одна неприятная ситуация.
Борьба с взаимной блокировкой потоков (deadlock)
Допустим у нас есть некий поток, выполняющий какую-то фоновую задачу и есть окно, где отображается ход выполнения этой задачи.
Как правило, поток посылает уведомление классу окна, который в свою
очередь вызывает системную функцию SendMessage, которая инициирует
какие-то действия в контексте оконной процедуры.
Системная функция SendMessage является блокирующей, она отсылает сообщение потоку окна и ждет пока тот его обработает.
Если подключение/отключение объекта-слушателя будет происходить также в
контексте оконной процедуры (в потоке окна) возможна взаимная
блокировка потоков, так называемый deadlock.
Такой deadlock может воспроизоводится крайне редко (в момент вызова
Subscribe/Unsubscribe и одновременном вызове MessageHandler в отдельном
потоке)
Следующий код эмулирует ситуацию с блокирующим вызовом системной ф-ции SendMessage.
CDispatcher g_Dispatcher;
CLock g_Lock;
class CListener:
public CSubscriber
{
virtual void MessageHandler(void* pContext)
{
//Эмулируем блокирующий вызов SendMessage
g_Lock.Lock();
wprintf(L"%d\n", *((int*)pContext));
g_Lock.Unlock();
}
};
DWORD WINAPI WorkingThread(PVOID pParam)
{
for(int i = 0;;++i)
{
g_Dispatcher.SendMessage(&i);
}
};
int _tmain(int argc, _TCHAR* argv[])
{
::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL);
CListener Listener1;
CListener Listener2;
for(;;)
{
//Эмулируем контекст оконной процедуры (обработчик оконного сообщения)
g_Lock.Lock();
g_Dispatcher.Subscribe(&Listener1);
g_Dispatcher.Subscribe(&Listener2);
g_Lock.Unlock();
Sleep(0);
g_Lock.Lock();
g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId());
g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId());
g_Lock.Unlock();
}
return 0;
}
Проблема заключается в том, что главный поток захватывает глобальный
объект синхронизации g_Lock (при аналогии с оконной процедурой —
выполняется в контексте оконного потока), и затем вызывает метод
Subscribe/Unsubscribe, который внутри пытается захватить второй объект
синхронизации CDispatcher::m_Lock.
В этот момент рабочий поток посылает уведомление, захватив при этом
CDispatcher::m_Lock в функции CDispatcher::SendMessage, и затем
пытается захватить глобальный объект синхронизации g_Lock (при аналогии
с оконом — вызывает системную функцию SendMessage).
Поток окна A -> B
Рабочий поток B -> A
Это можно назвать класическим deadlock-ом.
Проблема скрывается в функции CDispatcher::SendMessage().
Здесь должно соблюдаться правило — нельзя вызывать callback-функцию захватив при этом какой-либо объект синхронизации.
Итак, убираем блокировку при рассылке уведомлений.
void SendMessage(void* pContext)
{
CSubscriberList SubscriberList;
{
CScopeLocker ScopeLocker(m_Lock);
SubscriberList = m_SubscriberList;
}
for(size_t i = 0; i < SubscriberList.size(); ++i)
{
SubscriberList[i]->MessageHandler(pContext);
}
}
Контроль времени жизни подписчиков
После того, как мы убрали deadlock, появилась другая проблема — время жизни объектов-подписчиков.
У нас больше нет гарантии, что метод MessageHandler не будет вызван
после вызова Unsubscribe, и по этому мы не можем удалять
объект-подписчик непосредственно после вызова Unsubscribe.
В данной ситуации проще всего контролировать время жизни объектов-подписчиков с использованием счетчика ссылок.
Для этого можно исползовать технологию COM — унаследовать интерфейс
CSubscriber от IUnknown и использовать ATL CComPtr для списка
объектов-подписчиков, тоесть заменить std::vector<CSubscriber*>
на std::vector<CComPtr>.
Но такая реализация чревата дополнительными расходами на реализацию
классов-подписчиков, так как в каждом из них должны быть реализованы
методы AddRef/Release и ненужный QueryInterface, хотя если в проекте
активно используется COM, то такой подход может иметь приемущество.
Для контроля времени жизни объектов-подписчиков с исползованием счетчика ссылок хорошо подойдут умные указатели.
Простая реализация для многопоточной среды
typedef unsigned __int64 SubscriberId;
class CSubscriber
{
public:
virtual ~CSubscriber(){}
virtual void MessageHandler(void* pContext) = 0;
SubscriberId GetSubscriberId() {return (SubscriberId)this;}
};
typedef boost::shared_ptr<CSubscriber> CSubscriberPtr;
class CDispatcher
{
private:
typedef std::vector<CSubscriberPtr> CSubscriberList;
public:
SubscriberId Subscribe(CSubscriberPtr pNewSubscriber)
{
CScopeLocker ScopeLocker(m_Lock);
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
{
return 0;
}
}
m_SubscriberList.push_back(pNewSubscriber);
return pNewSubscriber->GetSubscriberId();
}
bool Unsubscribe(SubscriberId id)
{
CSubscriberPtr toRelease;
CScopeLocker ScopeLocker(m_Lock);
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == id)
{
toRelease = m_SubscriberList[i];
m_SubscriberList.erase(m_SubscriberList.begin() + i);
return true;
}
}
return false;
}
void SendMessage(void* pContext)
{
CSubscriberList SubscriberList;
{
CScopeLocker ScopeLocker(m_Lock);
SubscriberList = m_SubscriberList;
}
for(size_t i = 0; i < SubscriberList.size(); ++i)
{
SubscriberList[i]->MessageHandler(pContext);
}
}
private:
CSubscriberList m_SubscriberList;
CLock m_Lock;
};
В данной реализации я заменил «голый» указатель CSubscriber* на «умный»
указатель со счетчиком ссылок, такой оказался в библиотеке boost.
Также в функцию Unsubscribe я добавил переменную toRelease для того,
чтобы вызвать деструктор объекта-подписчика уже после вызова Unlock
(нельзя вызывать callback-функцию, включая деструктор объекта
подписчика, захватив при этом какой-либо объект синхронизации).
Cтоит обратить внимание на то, что в функции SendMessage происходит
копирование списка умных указателей (после копирования все указатели
увеличивают свои счетчики ссылок, а при выходе из функции уменьшают,
что и контролирует время жизни объектов-подписчиков)
Тестируем
CDispatcher g_Dispatcher;
CLock g_Lock;
class CListener:
public CSubscriber
{
virtual void MessageHandler(void* pContext)
{
//Эмулируем блокирующий вызов SendMessage
g_Lock.Lock();
wprintf(L"%d\n", *((int*)pContext));
g_Lock.Unlock();
}
};
DWORD WINAPI WorkingThread(PVOID pParam)
{
for(int i = 0;;++i)
{
g_Dispatcher.SendMessage(&i);
}
};
int _tmain(int argc, _TCHAR* argv[])
{
::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL);
for(;;)
{
boost::shared_ptr<CListener> pListener1(new CListener);
boost::shared_ptr<CListener> pListener2(new CListener);
//Эмулируем контекст оконной процедуры (обработчик оконного сообщения)
g_Lock.Lock();
g_Dispatcher.Subscribe(pListener1);
g_Dispatcher.Subscribe(pListener2);
g_Lock.Unlock();
Sleep(0);
g_Lock.Lock();
g_Dispatcher.Unsubscribe(pListener1->GetSubscriberId());
g_Dispatcher.Unsubscribe(pListener2->GetSubscriberId());
g_Lock.Unlock();
}
return 0;
}
Соптимизированная реализация для многопоточной среды
Как правило вызов функции SendMessage будет происходить намного чаще
чем Subscribe/Unsubscribe. При большом количестве подписчиков узким
местом может стать копирование списка подписчиков внутри SendMessage.
Копирование списка подписчиков можно перенести в функции
Subscribe/Unsubscribe. Это будет похоже на методику из lock-free
алгоритмов.
Объект CDispatcher будет хранить список подписчиков не на прямую, а при
помощи умного указателя. Внутри функции SendMessage мы будем получать
указатель на текущий список подписчиков и работать с ним. В функциях
Subscribe/Unsubscribe мы будем каждый раз создавать новый список
подписчиков и перенаправлять указатель внутри объекта CDispatcher на
новый список подписчиков. Таким образом в то время, когда указатель на
список подписчиков в объекте CDispatcher будет указывать уже на новый
список подписчиков, ф-ция SendMessage по прежнему будет работать со
старым списком. Так как старый список подписчиков никто не изменяет, то
все будет работать стабильно в многопоточной среде.
В принципе, можно несколько модифицировать функции
Subscribe/Unsubscribe и реализовать полностью lock-free алгоритм, но
это уже другая тема.
Медот Unsubscribe является асинхронным и не гарантирует после своего
завершения полное прекращение рассылки, половинное решение — подписчик
получает уведомление о прекращении подписки при помощи ф-ции
UnsubscribeHandler. Для реализации этого поведения добавлен
промежуточный класс CSubscriberItem, который в своем деструкоторе
вызывает ф-цию UnsubscribeHandler.
namespace Observer
{
//////////////////////////
// Subscriber
//////////////////////////
typedef unsigned __int64 SubscriberId;
class CSubscriber
{
public:
virtual ~CSubscriber(){}
virtual void MessageHandler(void* pContext) = 0;
virtual void UnsubscribeHandler() = 0;
SubscriberId GetSubscriberId() {return (SubscriberId)this;}
};
typedef boost::shared_ptr<CSubscriber> CSubscriberPtr;
//////////////////////////////////////////////////////////////////////
// Dispatcher
///////////////////////////////////
class CDispatcher
{
private:
class CSubscriberItem
{
public:
CSubscriberItem(CSubscriberPtr pSubscriber)
:m_pSubscriber(pSubscriber)
{
}
~CSubscriberItem()
{
m_pSubscriber->UnsubscribeHandler();
};
CSubscriberPtr Subscriber()const {return m_pSubscriber;}
private:
CSubscriberPtr m_pSubscriber;
};
typedef boost::shared_ptr<CSubscriberItem> CSubscriberItemPtr;
typedef std::vector<CSubscriberItemPtr> CSubscriberList;
typedef boost::shared_ptr<CSubscriberList> CSubscriberListPtr;
public:
CDispatcher()
{
}
private:
CDispatcher(const CDispatcher&){}
CDispatcher& operator=(const CDispatcher&){return *this;}
public:
SubscriberId Subscribe(CSubscriberPtr pNewSubscriber)
{
//Declaration of the next shared pointer before ScopeLocker
//prevents release of subscribers from under lock
CSubscriberListPtr pNewSubscriberList(new CSubscriberList());
//Enter to locked section
CScopeLocker ScopeLocker(m_Lock);
if(m_pSubscriberList)
{
//Copy existing subscribers
pNewSubscriberList->assign(m_pSubscriberList->begin(), m_pSubscriberList->end());
}
for(size_t i = 0; i < pNewSubscriberList->size(); ++i)
{
CSubscriberItemPtr pSubscriberItem = (*pNewSubscriberList)[i];
if(pSubscriberItem->Subscriber()->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
{
return 0;
}
}
//Add new subscriber to new subscriber list
pNewSubscriberList->push_back(CSubscriberItemPtr(new CSubscriberItem(pNewSubscriber)));
//Exchange subscriber lists
m_pSubscriberList = pNewSubscriberList;
return pNewSubscriber->GetSubscriberId();
}
bool Unsubscribe(SubscriberId id)
{
//Declaration of the next shared pointers before ScopeLocker
//prevents release of subscribers from under lock
CSubscriberItemPtr pSubscriberItemToRelease;
CSubscriberListPtr pNewSubscriberList;
//Enter to locked section
CScopeLocker ScopeLocker(m_Lock);
if(!m_pSubscriberList)
{
//No subscribers
return false;
}
pNewSubscriberList = CSubscriberListPtr(new CSubscriberList());
for(size_t i = 0; i < m_pSubscriberList->size(); ++i)
{
CSubscriberItemPtr pSubscriberItem = (*m_pSubscriberList)[i];
if(pSubscriberItem->Subscriber()->GetSubscriberId() == id)
{
pSubscriberItemToRelease = pSubscriberItem;
}
else
{
pNewSubscriberList->push_back(pSubscriberItem);
}
}
//Exchange subscriber lists
m_pSubscriberList = pNewSubscriberList;
if(!pSubscriberItemToRelease.get())
{
return false;
}
return true;
}
void SendMessage(void* pContext)
{
CSubscriberListPtr pSubscriberList;
{
CScopeLocker ScopeLocker(m_Lock);
if(!m_pSubscriberList)
{
//No subscribers
return;
}
//Get shared pointer to an existing list of subscribers
pSubscriberList = m_pSubscriberList;
}
//pSubscriberList pointer to copy of subscribers' list
for(size_t i = 0; i < pSubscriberList->size(); ++i)
{
(*pSubscriberList)[i]->Subscriber()->MessageHandler(pContext);
}
}
private:
CSubscriberListPtr m_pSubscriberList;
CLock m_Lock;
};
}; //namespace Observer
Ссылки
Библиотека boost::signals2
статья
Умные указатели
Джефф Элджер
Resource Acquisition Is Initialization (RAII)
википедия
Комментарии к первой версии этой статьи можно найти
здесь