cplus-plus.ru logo
Мы переехали на cplus-plus.ru
Главная страница В закладкиО сайтеКарта сайта
Хостинг от uCoz
Добавить в закладки

Меню сайта

Полезные ссылки

Наша рассылка
Подписаться на рассылку
"C++ : cplus-plus.ru :
Рассылка статей C++"


Друзья сайта
alsproject.ru Выбор выходного разделительного конденсатора

Приветствую Вас, Гость · rss 25-Апр-2024, 17:22
Главная » 2011 » Январь » 23 » Многопоточный Observer на С++ (практика)
12:35
Многопоточный Observer на С++ (практика)

Многопоточный Observer на С++ (практика)
Есть много вариаций на тему данного паттерна, но большинство примеров не подходит для многопоточных приложений.
  В этой статье я хочу поделится опытом применения паттерна в многопоточных приложениях и опишу основные проблемы, с которыми мне приходилось сталкиваться.

Цель данной стати — обратить внимание разработчиков на проблемы, с которыми можно столкнуться при создании многопоточных приложений. Выявить подводные камни в реализации коммуникации между компонентами в многопоточном приложении.

Если Вам необходимо готовое решение, обратите внимание на библиотеку Signals2, котрая включена в boost с мая 2009-го года.
  Я не пытаюсь предоставить решение, которое можно было бы использовать в готовом виде. Но тем не менее, ознакомившись с материалом, можно обойтись без использования сторонних библиотек, в тех проектах, в которых они по каким-либо причинам не доступны или нежелательны (драйвера, низкоуровневые приложения и т.п.).

Предметная область

Действующие лица
NotificationSender — объект, рассылающий сообщения.
Как правило это рабочий поток, извещающий об изменении своего состояния, которое необходимо отобразить на пользовательском интерфейсе.
NotificationListener — объект, реализующий обработку уведомлений.
Как правило это объект, который управляет отображением части пользовательского интерфейса связанного с фоновой задачей.
Таких объектов может быть множество, при этом они могут подключаться/отключаться динамически (к примеру открытие далогового окна, где показываются детали выполнения задачи)
NotificationDispatcher — объект, управляющий подписчиками и рассылкой сообщений.
Взаимодействие между объектами
Рассылка сообщений всем подписчикам.
Процесс подписки/прекшащения подписки.
Время жизни объектов.
  В данной статье описан метод синхронной рассылки сообщений. Это означает, что вызов функции SendMessage будет синхронным, и поток, вызывающий этот метод будет ожидать завершения обработки сообщений всеми подписчиками. В ряде случаев такой подход удобней ассинхронной рассылки, но при этом в нем есть трудности с прекращением подписки.

Простейшая реализация для однопоточной среды


typedef unsigned __int64 SubscriberId;
class CSubscriber
{
public:
virtual ~CSubscriber(){}
virtual void MessageHandler(void* pContext) = 0;
SubscriberId GetSubscriberId() {return (SubscriberId)this;}
};

class CDispatcher
{
private:
typedef std::vector<CSubscriber*> CSubscriberList;
public:
SubscriberId Subscribe(CSubscriber* pNewSubscriber)
{
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
{
return 0;
}
}
m_SubscriberList.push_back(pNewSubscriber);
return pNewSubscriber->GetSubscriberId();
}
bool Unsubscribe(SubscriberId id)
{
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == id)
{
m_SubscriberList.erase(m_SubscriberList.begin() + i);
return true;
}
}
return false;
}
void SendMessage(void* pContext)
{
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
m_SubscriberList[i]->MessageHandler(pContext);
}
}
private:
CSubscriberList m_SubscriberList;
};
Здесь уникальный идентификатор подписчика — адресс объекта подписчика, функция GetSubscriberId возвращает всегда одинаковое значение для одного объекта подписчика в не зависимости от преобразования типов.
Пример использования

class CListener:
public CSubscriber
{
virtual void MessageHandler(void* pContext)
{
wprintf(L"%d\n", *((int*)pContext));
}
};
int _tmain(int argc, _TCHAR* argv[])
{
CDispatcher Dispatcher;
CListener Listener1;
CListener Listener2;
Dispatcher.Subscribe(&Listener1);
Dispatcher.Subscribe(&Listener2);
for(int i = 0; i < 5; ++i)
{
Dispatcher.SendMessage(&i);
}
Dispatcher.Unsubscribe(Listener2.GetSubscriberId());
Dispatcher.Unsubscribe(Listener1.GetSubscriberId());
return 0;
}

Отключение подписчика внутри обработчика сообщений

  В примере есть проблема, не связанная с многопоточностью. Эта проблема проявляется, когда мы пытаемся отписаться внутри обработчика MessageHandler. Данная проблемма будет решена копированием списка подписчиков перед вызовом MessageHandler.

Переходим к многопоточной среде

С одним потоком такой код будет работать довольно стабильно.
Давайте посмотрим что будет при работе нескольких потоков.

CDispatcher g_Dispatcher;
DWORD WINAPI WorkingThread(PVOID pParam)
{
for(int i = 0;;++i)
{
g_Dispatcher.SendMessage(&i);
}
};
int _tmain(int argc, _TCHAR* argv[])
{
::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL);
CListener Listener1;
CListener Listener2;
for(;;)
{
g_Dispatcher.Subscribe(&Listener1);
g_Dispatcher.Subscribe(&Listener2);

g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId());
g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId());
}
return 0;
}
Рано или позно произойдет креш.
Проблема заключается в добавлении/удалении подписчиков и одновременной рассылке уведомлений (многопоточный доступ к CDispatcher::m_SubscriberList в нашем примере).
  Здесь необходима синхронизация доступа к списку подписчиков.

Синхронизация доступа к списку подписчиков


class CDispatcher
{
private:
typedef std::vector<CSubscriber*> CSubscriberList;
public:
SubscriberId Subscribe(CSubscriber* pNewSubscriber)
{
CScopeLocker ScopeLocker(m_Lock);
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
{
return 0;
}
}
m_SubscriberList.push_back(pNewSubscriber);
return pNewSubscriber->GetSubscriberId();
}
bool Unsubscribe(SubscriberId id)
{
CScopeLocker ScopeLocker(m_Lock);
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == id)
{
m_SubscriberList.erase(m_SubscriberList.begin() + i);
return true;
}
}
return false;
}
void SendMessage(void* pContext)
{
CScopeLocker ScopeLocker(m_Lock);
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
m_SubscriberList[i]->MessageHandler(pContext);
}
}
private:
CSubscriberList m_SubscriberList;
CLock m_Lock;
};
Синхронизация доступа была реализована при помощи объектов синхронизации (Critical section или Mutex).
Для большей переносимости и для того, чтобы не отвлекаться от сути происходящего, абстрагируемся от прямых вызовов платформенно-зависимых функций, типа EnterCriticalSection. Для этого служит класс CLock.
Для устойчивости к с++ исключениям удобно использовать технологию RAII, а именно класс CScopeLocker, который в конструкторе захватывает объект синхронизации, а в деструкторе освобождает его.
  При такой реализации программа не будет падать, но нас поджидает еще одна неприятная ситуация.

Борьба с взаимной блокировкой потоков (deadlock)

Допустим у нас есть некий поток, выполняющий какую-то фоновую задачу и есть окно, где отображается ход выполнения этой задачи.
Как правило, поток посылает уведомление классу окна, который в свою очередь вызывает системную функцию SendMessage, которая инициирует какие-то действия в контексте оконной процедуры.
Системная функция SendMessage является блокирующей, она отсылает сообщение потоку окна и ждет пока тот его обработает.
Если подключение/отключение объекта-слушателя будет происходить также в контексте оконной процедуры (в потоке окна) возможна взаимная блокировка потоков, так называемый deadlock.
Такой deadlock может воспроизоводится крайне редко (в момент вызова Subscribe/Unsubscribe и одновременном вызове MessageHandler в отдельном потоке)
  Следующий код эмулирует ситуацию с блокирующим вызовом системной ф-ции SendMessage.

CDispatcher g_Dispatcher;
CLock g_Lock;
class CListener:
public CSubscriber
{
virtual void MessageHandler(void* pContext)
{
//Эмулируем блокирующий вызов SendMessage
g_Lock.Lock();
wprintf(L"%d\n", *((int*)pContext));
g_Lock.Unlock();
}
};
DWORD WINAPI WorkingThread(PVOID pParam)
{
for(int i = 0;;++i)
{
g_Dispatcher.SendMessage(&i);
}
};
int _tmain(int argc, _TCHAR* argv[])
{
::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL);
CListener Listener1;
CListener Listener2;
for(;;)
{
//Эмулируем контекст оконной процедуры (обработчик оконного сообщения)
g_Lock.Lock();
g_Dispatcher.Subscribe(&Listener1);
g_Dispatcher.Subscribe(&Listener2);
g_Lock.Unlock();
Sleep(0);
g_Lock.Lock();
g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId());
g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId());
g_Lock.Unlock();
}
return 0;
}
Проблема заключается в том, что главный поток захватывает глобальный объект синхронизации g_Lock (при аналогии с оконной процедурой — выполняется в контексте оконного потока), и затем вызывает метод Subscribe/Unsubscribe, который внутри пытается захватить второй объект синхронизации CDispatcher::m_Lock.
  В этот момент рабочий поток посылает уведомление, захватив при этом CDispatcher::m_Lock в функции CDispatcher::SendMessage, и затем пытается захватить глобальный объект синхронизации g_Lock (при аналогии с оконом — вызывает системную функцию SendMessage).
Поток окна A -> B
Рабочий поток B -> A
Это можно назвать класическим deadlock-ом.
Проблема скрывается в функции CDispatcher::SendMessage().
Здесь должно соблюдаться правило — нельзя вызывать callback-функцию захватив при этом какой-либо объект синхронизации.
  Итак, убираем блокировку при рассылке уведомлений.

void SendMessage(void* pContext)
{
CSubscriberList SubscriberList;
{
CScopeLocker ScopeLocker(m_Lock);
SubscriberList = m_SubscriberList;
}
for(size_t i = 0; i < SubscriberList.size(); ++i)
{
SubscriberList[i]->MessageHandler(pContext);
}
}

Контроль времени жизни подписчиков

После того, как мы убрали deadlock, появилась другая проблема — время жизни объектов-подписчиков.
У нас больше нет гарантии, что метод MessageHandler не будет вызван после вызова Unsubscribe, и по этому мы не можем удалять объект-подписчик непосредственно после вызова Unsubscribe.
В данной ситуации проще всего контролировать время жизни объектов-подписчиков с использованием счетчика ссылок.
Для этого можно исползовать технологию COM — унаследовать интерфейс CSubscriber от IUnknown и использовать ATL CComPtr для списка объектов-подписчиков, тоесть заменить std::vector<CSubscriber*> на std::vector<CComPtr>.
Но такая реализация чревата дополнительными расходами на реализацию классов-подписчиков, так как в каждом из них должны быть реализованы методы AddRef/Release и ненужный QueryInterface, хотя если в проекте активно используется COM, то такой подход может иметь приемущество.
  Для контроля времени жизни объектов-подписчиков с исползованием счетчика ссылок хорошо подойдут умные указатели.

Простая реализация для многопоточной среды


typedef unsigned __int64 SubscriberId;
class CSubscriber
{
public:
virtual ~CSubscriber(){}
virtual void MessageHandler(void* pContext) = 0;
SubscriberId GetSubscriberId() {return (SubscriberId)this;}
};
typedef boost::shared_ptr<CSubscriber> CSubscriberPtr;

class CDispatcher
{
private:
typedef std::vector<CSubscriberPtr> CSubscriberList;
public:
SubscriberId Subscribe(CSubscriberPtr pNewSubscriber)
{
CScopeLocker ScopeLocker(m_Lock);
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
{
return 0;
}
}
m_SubscriberList.push_back(pNewSubscriber);
return pNewSubscriber->GetSubscriberId();
}
bool Unsubscribe(SubscriberId id)
{
CSubscriberPtr toRelease;
CScopeLocker ScopeLocker(m_Lock);
for(size_t i = 0; i < m_SubscriberList.size(); ++i)
{
if(m_SubscriberList[i]->GetSubscriberId() == id)
{
toRelease = m_SubscriberList[i];
m_SubscriberList.erase(m_SubscriberList.begin() + i);
return true;
}
}
return false;
}
void SendMessage(void* pContext)
{
CSubscriberList SubscriberList;
{
CScopeLocker ScopeLocker(m_Lock);
SubscriberList = m_SubscriberList;
}
for(size_t i = 0; i < SubscriberList.size(); ++i)
{
SubscriberList[i]->MessageHandler(pContext);
}
}
private:
CSubscriberList m_SubscriberList;
CLock m_Lock;
};
В данной реализации я заменил «голый» указатель CSubscriber* на «умный» указатель со счетчиком ссылок, такой оказался в библиотеке boost.
Также в функцию Unsubscribe я добавил переменную toRelease для того, чтобы вызвать деструктор объекта-подписчика уже после вызова Unlock (нельзя вызывать callback-функцию, включая деструктор объекта подписчика, захватив при этом какой-либо объект синхронизации).
  Cтоит обратить внимание на то, что в функции SendMessage происходит копирование списка умных указателей (после копирования все указатели увеличивают свои счетчики ссылок, а при выходе из функции уменьшают, что и контролирует время жизни объектов-подписчиков)

Тестируем


CDispatcher g_Dispatcher;
CLock g_Lock;
class CListener:
public CSubscriber
{
virtual void MessageHandler(void* pContext)
{
//Эмулируем блокирующий вызов SendMessage
g_Lock.Lock();
wprintf(L"%d\n", *((int*)pContext));
g_Lock.Unlock();
}
};
DWORD WINAPI WorkingThread(PVOID pParam)
{
for(int i = 0;;++i)
{
g_Dispatcher.SendMessage(&i);
}
};
int _tmain(int argc, _TCHAR* argv[])
{
::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL);
for(;;)
{
boost::shared_ptr<CListener> pListener1(new CListener);
boost::shared_ptr<CListener> pListener2(new CListener);
//Эмулируем контекст оконной процедуры (обработчик оконного сообщения)
g_Lock.Lock();
g_Dispatcher.Subscribe(pListener1);
g_Dispatcher.Subscribe(pListener2);
g_Lock.Unlock();
Sleep(0);
g_Lock.Lock();
g_Dispatcher.Unsubscribe(pListener1->GetSubscriberId());
g_Dispatcher.Unsubscribe(pListener2->GetSubscriberId());
g_Lock.Unlock();
}
return 0;
}

Соптимизированная реализация для многопоточной среды

Как правило вызов функции SendMessage будет происходить намного чаще чем Subscribe/Unsubscribe. При большом количестве подписчиков узким местом может стать копирование списка подписчиков внутри SendMessage.
Копирование списка подписчиков можно перенести в функции Subscribe/Unsubscribe. Это будет похоже на методику из lock-free алгоритмов.
Объект CDispatcher будет хранить список подписчиков не на прямую, а при помощи умного указателя. Внутри функции SendMessage мы будем получать указатель на текущий список подписчиков и работать с ним. В функциях Subscribe/Unsubscribe мы будем каждый раз создавать новый список подписчиков и перенаправлять указатель внутри объекта CDispatcher на новый список подписчиков. Таким образом в то время, когда указатель на список подписчиков в объекте CDispatcher будет указывать уже на новый список подписчиков, ф-ция SendMessage по прежнему будет работать со старым списком. Так как старый список подписчиков никто не изменяет, то все будет работать стабильно в многопоточной среде.
В принципе, можно несколько модифицировать функции Subscribe/Unsubscribe и реализовать полностью lock-free алгоритм, но это уже другая тема.
Медот Unsubscribe является асинхронным и не гарантирует после своего завершения полное прекращение рассылки, половинное решение — подписчик получает уведомление о прекращении подписки при помощи ф-ции UnsubscribeHandler. Для реализации этого поведения добавлен промежуточный класс CSubscriberItem, который в своем деструкоторе вызывает ф-цию UnsubscribeHandler.

namespace Observer
{
//////////////////////////
// Subscriber
//////////////////////////
typedef unsigned __int64 SubscriberId;
class CSubscriber
{
public:
virtual ~CSubscriber(){}
virtual void MessageHandler(void* pContext) = 0;
virtual void UnsubscribeHandler() = 0;
SubscriberId GetSubscriberId() {return (SubscriberId)this;}
};
typedef boost::shared_ptr<CSubscriber> CSubscriberPtr;

//////////////////////////////////////////////////////////////////////
// Dispatcher
///////////////////////////////////
class CDispatcher
{
private:
class CSubscriberItem
{
public:
CSubscriberItem(CSubscriberPtr pSubscriber)
:m_pSubscriber(pSubscriber)
{
}
~CSubscriberItem()
{
m_pSubscriber->UnsubscribeHandler();
};
CSubscriberPtr Subscriber()const {return m_pSubscriber;}
private:
CSubscriberPtr m_pSubscriber;
};
typedef boost::shared_ptr<CSubscriberItem> CSubscriberItemPtr;
typedef std::vector<CSubscriberItemPtr> CSubscriberList;
typedef boost::shared_ptr<CSubscriberList> CSubscriberListPtr;
public:
CDispatcher()
{
}
private:
CDispatcher(const CDispatcher&){}
CDispatcher& operator=(const CDispatcher&){return *this;}
public:
SubscriberId Subscribe(CSubscriberPtr pNewSubscriber)
{
//Declaration of the next shared pointer before ScopeLocker
//prevents release of subscribers from under lock
CSubscriberListPtr pNewSubscriberList(new CSubscriberList());
//Enter to locked section
CScopeLocker ScopeLocker(m_Lock);
if(m_pSubscriberList)
{
//Copy existing subscribers
pNewSubscriberList->assign(m_pSubscriberList->begin(), m_pSubscriberList->end());
}
for(size_t i = 0; i < pNewSubscriberList->size(); ++i)
{
CSubscriberItemPtr pSubscriberItem = (*pNewSubscriberList)[i];
if(pSubscriberItem->Subscriber()->GetSubscriberId() == pNewSubscriber->GetSubscriberId())
{
return 0;
}
}
//Add new subscriber to new subscriber list
pNewSubscriberList->push_back(CSubscriberItemPtr(new CSubscriberItem(pNewSubscriber)));
//Exchange subscriber lists
m_pSubscriberList = pNewSubscriberList;
return pNewSubscriber->GetSubscriberId();
}
bool Unsubscribe(SubscriberId id)
{
//Declaration of the next shared pointers before ScopeLocker
//prevents release of subscribers from under lock
CSubscriberItemPtr pSubscriberItemToRelease;
CSubscriberListPtr pNewSubscriberList;
//Enter to locked section
CScopeLocker ScopeLocker(m_Lock);
if(!m_pSubscriberList)
{
//No subscribers
return false;
}
pNewSubscriberList = CSubscriberListPtr(new CSubscriberList());
for(size_t i = 0; i < m_pSubscriberList->size(); ++i)
{
CSubscriberItemPtr pSubscriberItem = (*m_pSubscriberList)[i];
if(pSubscriberItem->Subscriber()->GetSubscriberId() == id)
{
pSubscriberItemToRelease = pSubscriberItem;
}
else
{
pNewSubscriberList->push_back(pSubscriberItem);
}
}
//Exchange subscriber lists
m_pSubscriberList = pNewSubscriberList;
if(!pSubscriberItemToRelease.get())
{
return false;
}
return true;
}
void SendMessage(void* pContext)
{
CSubscriberListPtr pSubscriberList;
{
CScopeLocker ScopeLocker(m_Lock);
if(!m_pSubscriberList)
{
//No subscribers
return;
}
//Get shared pointer to an existing list of subscribers
pSubscriberList = m_pSubscriberList;
}
//pSubscriberList pointer to copy of subscribers' list
for(size_t i = 0; i < pSubscriberList->size(); ++i)
{
(*pSubscriberList)[i]->Subscriber()->MessageHandler(pContext);
}
}
private:
CSubscriberListPtr m_pSubscriberList;
CLock m_Lock;
};

}; //namespace Observer

Ссылки

Библиотека boost::signals2 статья
Умные указатели Джефф Элджер
Resource Acquisition Is Initialization (RAII) википедия
Комментарии к первой версии этой статьи можно найти здесь
Источник: http://habrahabr.ru
Категория: Новости | Просмотров: 1666 | Добавил: FazaNaka | Рейтинг: 5.0/1
Всего комментариев: 0
Добавлять комментарии могут только зарегистрированные пользователи.
[ Регистрация | Вход ]