Обзор технологии Microsoft StreamInsight. Часть 3

Пример

Теперь пора перейти к примеру. Я буду использовать пример, идущий в комплекте с SQL Server 2008 R2 Update for Developers Training Kit, который называется HighwayMonitor.

Задача:

Имеется шоссе с 8 рядами в каждую сторону. Имеется 6 точек наблюдения, с которых поступают сигналы. Измеряется скорость едущих машин.

Существует пять типов машин:

  • Скорая (Ambulance)
  • Автобус (Bus)
  • Грузовик (Truck)
  • Такси (Taxi)
  • Легковой автомобить (Car)

Мы не будем ставить каких-то конкретных задач по анализу данных, на этом примере мы просто убедимся в том, что StreamInsight может справляться с задачей обратки данных, написание более разумных запросов остается читателям в качестве самостоятельной работы.

Пока будем просто выводить события в DataGridView.

Открываем проект в Visual Studio 2010.

В окне Solution Explorer посмотрим на структуру проекта:

Позже мы рассмотрим классы, содержащиеся в проекте, пока нужно отметить лишь то, что в проект добавлены ссылки на библиотеки StreamInsight, которые могут быть найдены в папке, в которую был установлен StreamInsight.

Определим формат событий в файле TollPointTypes.cs


public
class
TollPointEvent : IRandomInit

{


public
static
int TollPoints = 6;


public
static
int Lanes = 8;


public
static
Random rand;


public
Guid EventID;


public
Int32 TollPointId;


public
Int32 DirectionId;


public
Int32 Lane;


public
Int32 VehicleTypeId;


public
string TagId;


public
DateTime EnterGate;


public
Int32 MillisecondsToPassSpeedCheckPoint;


public
DateTime ExitGate;


public TollPointEvent()

{

}


public
void Init()

{


if (null == rand)

{

rand = new
Random();

}


this.EventID = Guid.NewGuid();


this.TollPointId = rand.Next(TollPoints);


this.DirectionId = PickDirection(rand);


this.Lane = rand.Next(Lanes);


this.VehicleTypeId = PickVehicleTypeId(rand);


this.TagId = rand.Next(Int32.MaxValue).ToString();


this.ExitGate = DateTime.Now;


Int32 vehicleSpeed = rand.Next(30, 120); // generate car speeds between 30 and 120 kph


//Int32 vehicleSpeed = 80;


Int32 vehicleLength = PickLength(rand, this.VehicleTypeId);


this.EnterGate = CalcEnter(this.ExitGate, vehicleSpeed, vehicleLength);


this.MillisecondsToPassSpeedCheckPoint = CalcPassSpeedCheckPoint(this.EnterGate, vehicleSpeed);


Trace.WriteLine(this.EventID.ToString());

}

}

Также в классе содержатся дополнительные методы, но для нас они не слишком важны.

В файле TollPointInput.cs содержится описание входного адаптера. Поскольку у нас нет реальных датчиков, адаптер самостоятельно создает события.


public
class
TollPointInput<TollPointEvent> : TypedPointInputAdapter<TollPointEvent>

{


private
TollPointInputConfig _config;


public TollPointInput(TollPointInputConfig config)

{

_config = config;

}


public
override
void Resume()

{

ProduceEvents();

}


public
override
void Start()

{

ProduceEvents();

}


private
void ProduceEvents()

{


PointEvent<TollPointEvent> currEvent = default(PointEvent<TollPointEvent>);


EnqueueOperationResult result = EnqueueOperationResult.Full;


Random rand = new
Random();


while (!HighwayMonitor.Program.MainWindow.NeedToStop)

{


if (AdapterState.Stopping == AdapterState)

{

Stopped();


return;

}

currEvent = CreateInsertEvent();


if (null == currEvent)

{


continue;

}

currEvent.StartTime = DateTime.Now;


currEvent.Payload = (TollPointEvent)Activator.CreateInstance(typeof(TollPointEvent));

(currEvent.Payload as
IRandomInit).Init();

result = Enqueue(ref currEvent);


if (EnqueueOperationResult.Full == result)

{

ReleaseEvent(ref currEvent);

Ready();


return;

}


Thread.Sleep(rand.Next(1,500));

}


this.Stopped();

}

}

Мы знаем параметры событий (TollPointEvent), а наши события являются мгновенными, так что в качестве базового класса выбран TypedPointInputAdapter.

Реализованы методы Resume() и Start(), которые просто вызывают метод ProduceEvents(), на котором стоит остановиться подробнее.

currEvent = CreateInsertEvent();

Создает новое событие с типом INSERT, далее поля события (включая время возникновения) заполняются. После того как все поля заполнены, вызывается метод Enqueue(), который ставит новое событие в очередь обработки.

В коде также выполняется достаточно много проверок, которые обязательно нужно производить, например, может оказаться, что очередь событий переполнена, в таком случае новое событие не может быть добавлено в очередь.

В файле TollPointInputFactory.cs описана фабрика входных адаптеров, метод Create которой просто возвращает экземпляр только что рассмотренного адаптера.


public
class
TollPointInputFactory : ITypedInputAdapterFactory<TollPointInputConfig>, ITypedDeclareAdvanceTimeProperties<TollPointInputConfig>

{


public
InputAdapterBase Create<TollPointEvent>(TollPointInputConfig configInfo, EventShape eventShape)

{


return
new
TollPointInput<TollPointEvent>(configInfo);

}


public
void Dispose()

{

}


public
AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(TollPointInputConfig configInfo, EventShape eventShape)

{


var atgs = new
AdvanceTimeGenerationSettings(configInfo.CtiFrequency, TimeSpan.FromSeconds(0), true);


var ats = new
AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);


return ats;

}

}

Метод DeclareAdvanceTimeProperties устанавливает политику работы с событиями CTI.

Первая строка метода создает настройки времени таким образом, чтобы событие CTI создавалось после каждого configInfo.CtiFrequency-го события. В основной программе для этого параметра будет задано значение 1, это значит, что событие CTI будет создаваться после каждого события.

Во второй строчке указывается политика обработки событий, которые поступили позже события CTI, но их временная отметка более раняя. В данном случае устанавливается значение Drop (на самом деле мы не могли установить другое значение, поскольку мы работаем с мгновенными (Point) событиями).

Файл GridOutputAdapter.cs содержит описание выходного адаптера


public
class
GridOutputAdapter : PointOutputAdapter

{


private
EventWaitHandle _adapterStopSignal;


private
CepEventType _bindtimeEventType;


private
int _eventsDequeued = 0;


public GridOutputAdapter(string StopSignalName, CepEventType EventType)

{

_bindtimeEventType = EventType;

_adapterStopSignal = EventWaitHandle.OpenExisting(StopSignalName);

}


public
override
void Start()

{


List<string> columnHeaders = new
List<string>();

columnHeaders.Add(«Command»);

columnHeaders.Add(«Timestamp»);


for (int fieldCounter = 0; fieldCounter < _bindtimeEventType.FieldsByOrdinal.Count; fieldCounter++)

{


CepEventTypeField eventFieldType = _bindtimeEventType.FieldsByOrdinal[fieldCounter];

columnHeaders.Add(eventFieldType.Name);

}

HighwayMonitor.Program.MainWindow.AddEventToDisplayListBox(columnHeaders.ToArray(), _bindtimeEventType.FieldsByOrdinal.Count + 2);

ConsumeEvents();

}


public
override
void Resume()

{

ConsumeEvents();

}


protected
override
void Dispose(bool disposing)

{


base.Dispose(disposing);

}


private
void ConsumeEvents()

{


PointEvent currEvent = default(PointEvent);


DequeueOperationResult result;


try

{


while (true)

{


if (AdapterState.Stopping == AdapterState)

{

result = Dequeue(out currEvent);

PrepareToStop(currEvent, result);

Stopped();

_adapterStopSignal.Set();


return;

}


result = Dequeue(out currEvent);


if (DequeueOperationResult.Empty == result)

{

PrepareToResume();

Ready();


return;

}


else

{

_eventsDequeued++;


HighwayMonitor.Program.MainWindow.AddEventToDisplayListBox(CreateStringArrayFromEvent(currEvent), _bindtimeEventType.FieldsByOrdinal.Count + 2);

ReleaseEvent(ref currEvent);

}

}

}


catch (AdapterException e)

{


Console.WriteLine(«ConsumeEvents — « + e.Message + e.StackTrace);

}

}


private
void PrepareToStop(PointEvent currEvent, DequeueOperationResult result)

{


if (DequeueOperationResult.Success == result)

{

ReleaseEvent(ref currEvent);

}

}


private
void PrepareToResume()

{

}


private
string[] CreateStringArrayFromEvent(PointEvent currEvent)

{


if (EventKind.Cti == currEvent.EventKind)

{


return
new
string[] {null, «CTI», currEvent.StartTime.ToString() };

}


else

{


List<string> eventDetails = new
List<string>();

eventDetails.Add(null); // leave the first column for an image

eventDetails.Add(«INSERT»);

eventDetails.Add(currEvent.StartTime.ToString());


for (int fieldCounter = 0; fieldCounter < _bindtimeEventType.FieldsByOrdinal.Count; fieldCounter++)

{


CepEventTypeField eventFieldType = _bindtimeEventType.FieldsByOrdinal[fieldCounter];


object value = Convert.ChangeType(currEvent.GetField(fieldCounter), eventFieldType.Type.ClrType, CultureInfo.CurrentCulture);

eventDetails.Add((value != null) ? value.ToString() : «NULL»);

}


return eventDetails.ToArray();

}

}

}

Наиболее интересным для нас методом тут является метод ConsumeEvents(). При помощи метода Dequeue() из очереди извлекается событие (если оно там есть), из которого создается набор строк в методе CreateStringArrayFromEvent() . Полученные строки добавляются в грид на форме.

Обратите внимание на то, что в выходном адаптере мы уже не может работать с событием как с типизированными данными. Дело в том, что в ходе StreamInsight запроса мы могли создать (и, скорее всего, создали) проекцию данных, и этот новый тип данных мы уже не знаем. Так что мы просто проходим в цикле по всем имеющимся полям.

Фабрика выходных адаптеров описана в файле GridOutputAdapterFactory.cs:


public
class
GridOutputAdapterFactory : IOutputAdapterFactory<string>

{


public
OutputAdapterBase Create(string StopSignalName,EventShape Shape, CepEventType EventType)

{


return
new
GridOutputAdapter(StopSignalName, EventType);

}


public
void Dispose()

{

}

}

Тут совсем нет ничего интересного, мы просто возвращаем новый экземпляр выходного адаптера.

Основная логика приложения находится в методе обработчика нажатия кнопки Start формы, рассмотрим только этот метод.


private
void StartButton_Click(object sender, EventArgs buttonArgs)

{


if (StartButton.Text == «&Start»)

{

NeedToStop = false;


try

{

_tracer.WriteLine(«Starting Toll Tracker»);

_server = Microsoft.ComplexEventProcessing.Server.Create(«si»);

_application = _server.CreateApplication(«HighwayMonitor»);


var input = CepStream<TollPointEvent>.Create(«TollPointInputStream»,


typeof(TollPointInputFactory),

_inputConfig,


EventShape.Point);


//———————————————————————-


//- Modify this query — must have a resultant query called queryOutput


// 2.1.01 — original query


var queryOutput = from e in input


select e;


//———————————————————————

_results = queryOutput.ToQuery(_application, «TollData», «TollData Query», typeof(GridOutputAdapterFactory),

_stopSignalName,


EventShape.Point,


StreamEventOrder.FullyOrdered);

_adapterStopSignal.Reset();

_results.Start();

StartButton.Text = «Sto&p»;

DisplayDataGridView.Rows.Clear();

DisplayDataGridView.Columns.Clear();

}


catch (Exception e)

{

_tracer.WriteLine(e.ToString());


if (_results != null) _results.Stop();


MessageBox.Show(«Unable to start query. Error returned was:\n\r» + e.ToString());

}

}


else

{

NeedToStop = true;

_adapterStopSignal.WaitOne();

_results.Stop();

_results = null;


if (_server != null)

{

_server.Dispose();

_server = null;

}

StartButton.Text = «&Start»;

}

}

Первое, на что стоит обратить внимание – это создание движка StreamInsight.

_server = Microsoft.ComplexEventProcessing.Server.Create(«si»);

Параметр метода Create() — это название экземпляра StreamInsight сервера, которые мы задавали при инсталляции (в моем случае – «si»).

Далее создается входной поток с использованием фабрики входных адаптеров.


var input = CepStream<TollPointEvent>.Create(«TollPointInputStream»,


typeof(TollPointInputFactory),

_inputConfig,


EventShape.Point);

queryOutput – это запрос для обработки входного потока, в данном случае просто выбираются все события.


var queryOutput = from e in input


select e;

Выходной поток инициализируется при помощи фабрики выходных адаптеров и запроса.

_results = queryOutput.ToQuery(_application, «TollData», «TollData Query», typeof(GridOutputAdapterFactory),

_stopSignalName,


EventShape.Point,


StreamEventOrder.FullyOrdered);

Обратите внимание на вызов метода ToQuery(), он преобразует LINQ запрос в запрос StreamInsight.

Вызов _results.Start(); запускает StreamInsight запрос на выполнение.

Если запустить приложение и нажать на кнопку Start, то должна получиться следующая картина:

Рекомендую ознакомиться с файлами Demo 2.1 QueryStreamInsightUsingLINQ.txt и Demo 2.2 Using Advanced Query Options.txt, в них содержатся примеры более интересных запросов для данного приложения.

Заключение

Технология StreamInsight, как минимум, заслуживает внимания, поскольку она предоставляет новый способ анализа данных. Причем, скорость обратки событий и парадигма в целом не могут не радовать.

С другой стороны, технология новая и некоторые моменты вызывают сомнения, на мой взгляд, наиболее очевидные недостатки технологии на сегодняшний день следующие:

  • Непонятная ценовая политика. Во время подготовки статьи я не смог найти бесплатной версии StreamInsight без ограничения по времени или возможности купить StreamInsight отдельно от SQL Server 2008 R2. (Хотя, возможно, я плохо искал)
  • Не понятно, каким образом аналитик должен писать LINQ запросы. Как пока известно, планируется разработка специального языка StreamInsight запросов, понятного аналитику, а не только разработчику.

Несмотря на указанные недостатки, я уверен, что уже сейчас найдется достаточно разработчиков, которых заинтересует данная технология, ведь задачи, которые может успешно решать StreamInsight, возникают достаточно часто.

Материалы для дальнейшего изучения

В рамках данной статьи были рассмотрены лишь основные аспекты работы со StreamInsight, для более глубокого понимания технологии, я рекомендую обратиться к следующим источникам:

Techdays.ru – материалы на русском

(http://www.techdays.ru/Search.aspx?Tag=SQL%2bServer%2b2008%2bR2)

SQL Server 2008 R2 Update for Developers Training Kit – крайне полезные обучающие материалы
(http://www.microsoft.com/downloads/details.aspx?familyid=FFFAAD6A-0153-4D41-B289-A3ED1D637C0D&displaylang=en)

Документация по StreamInsight (поставляется вместе с StreamInsight)

Реклама
Запись опубликована в рубрике Uncategorized. Добавьте в закладки постоянную ссылку.

Добавить комментарий

Заполните поля или щелкните по значку, чтобы оставить свой комментарий:

Логотип WordPress.com

Для комментария используется ваша учётная запись WordPress.com. Выход / Изменить )

Фотография Twitter

Для комментария используется ваша учётная запись Twitter. Выход / Изменить )

Фотография Facebook

Для комментария используется ваша учётная запись Facebook. Выход / Изменить )

Google+ photo

Для комментария используется ваша учётная запись Google+. Выход / Изменить )

Connecting to %s