Оглавление
- Что такое EventStoreDB?
- Архитектура Event Sourcing
- Установка и запуск EventStoreDB
- Установка клиентской библиотеки для PowerShell
- Полная реализация клиента для PowerShell
- Примеры использования Event Sourcing
- Производительность и масштабирование
- Мониторинг и администрирование
- Резервное копирование и восстановление
- Сравнение с другими решениями
- Плюсы EventStoreDB для Event Sourcing
- Минусы и ограничения
- Рекомендации по использованию
- Заключение
Что такое EventStoreDB?
EventStoreDB — это специализированная база данных для Event Sourcing и потоковой обработки событий:
- Основана на принципе «append-only» журнала событий
- Оптимизирована для хранения последовательности событий
- Поддерживает подписки и проекции
- ACID-транзакции для событий
- Использует gRPC и HTTP API
Архитектура Event Sourcing
graph TB
A[PowerShell Script] --> B[EventStoreDB Client]
B --> C[EventStoreDB Server]
subgraph "EventStoreDB Architecture"
C --> D[Event Streams]
D --> E[Append-Only Journal]
E --> F[Indexes]
C --> G[Projections Engine]
G --> H[Materialized Views]
C --> I[Subscriptions]
I --> J[Real-time Consumers]
end
subgraph "Event Sourcing Pattern"
K[Aggregate] --> L[Command]
L --> M[Event]
M --> N[Event Store]
N --> O[State Rebuild]
O --> P[Current State]
end
B --> K
N --> C
style A fill:#e1f5fe
style C fill:#f3e5f5
style M fill:#fff3e0
Установка и запуск EventStoreDB
1. Вариант установки: Docker (рекомендуется)
# Запуск EventStoreDB в Docker
docker run -d `
--name eventstore `
-p 2113:2113 ` # HTTP API и UI
-p 1113:1113 ` # gRPC порт
-v C:\Data\EventStoreDB:/var/lib/eventstore `
eventstore/eventstore:22.10.0-buster-slim `
--insecure ` # Для разработки без SSL
--run-projections=All `
--enable-external-tcp `
--enable-atom-pub-over-http
# Проверка работы
Invoke-RestMethod -Uri "http://localhost:2113/stats" | ConvertTo-Json
2. Вариант установки: Native Windows
# Скачивание EventStoreDB для Windows
$version = "22.10.0"
$url = "https://github.com/EventStore/EventStore/releases/download/oss-v$version/EventStore-OSS-Win-$version.zip"
$tempPath = "$env:TEMP\EventStoreDB-$version"
# Скачивание и распаковка
Invoke-WebRequest -Uri $url -OutFile "$tempPath.zip"
Expand-Archive -Path "$tempPath.zip" -DestinationPath $tempPath -Force
# Конфигурационный файл
$config = @"
Db: C:\Data\EventStoreDB
Log: C:\Logs\EventStoreDB
RunProjections: All
ClusterSize: 1
StartStandardProjections: true
EnableAtomPubOverHTTP: true
"@ | Out-File "$tempPath\eventstore.conf" -Encoding UTF8
# Запуск как фоновый процесс
$process = Start-Process `
-FilePath "$tempPath\EventStore.ClusterNode.exe" `
-ArgumentList "--config $tempPath\eventstore.conf" `
-NoNewWindow `
-PassThru
# Проверка запуска
Start-Sleep -Seconds 5
Test-NetConnection -ComputerName localhost -Port 2113
3. Вариант установки: Windows Service
# Установка как службы Windows (требует прав администратора)
$serviceName = "EventStoreDB"
$installPath = "C:\Program Files\EventStoreDB"
# Создание службы
New-Service `
-Name $serviceName `
-BinaryPathName "$installPath\EventStore.ClusterNode.exe --config $installPath\eventstore.conf" `
-DisplayName "EventStoreDB" `
-StartupType Automatic `
-Description "EventStoreDB Event Sourcing Database"
# Запуск службы
Start-Service -Name $serviceName
Get-Service -Name $serviceName
Установка клиентской библиотеки для PowerShell
1. Установка через NuGet
# Установка NuGet провайдера
Install-PackageProvider -Name NuGet -Force -MinimumVersion 2.8.5.201
# Установка официального клиента .NET
Install-Package EventStore.Client.Grpc -ProviderName NuGet
# Альтернатива: установка через dotnet CLI
dotnet add package EventStore.Client.Grpc --version 22.10.0
2. Ручная установка с зависимостями
function Install-EventStoreClient {
param(
[string]$InstallPath = ".\lib"
)
New-Item -ItemType Directory -Path $InstallPath -Force
# Основные пакеты
$packages = @(
@{Name = "EventStore.Client.Grpc"; Version = "22.10.0"},
@{Name = "EventStore.Client.Grpc.PersistentSubscriptions"; Version = "22.10.0"},
@{Name = "EventStore.Client.Grpc.ProjectionManagement"; Version = "22.10.0"},
@{Name = "Grpc.Net.Client"; Version = "2.52.0"},
@{Name = "Grpc.Tools"; Version = "2.52.0"}
)
foreach ($package in $packages) {
Write-Host "Установка $($package.Name) v$($package.Version)..." -ForegroundColor Yellow
Install-Package -Name $package.Name -RequiredVersion $package.Version `
-ProviderName NuGet -Destination $InstallPath -Force
}
# Загрузка сборок
Get-ChildItem -Path $InstallPath -Filter "*.dll" | ForEach-Object {
try {
Add-Type -Path $_.FullName
Write-Host "Загружена: $($_.Name)" -ForegroundColor Green
} catch {
Write-Warning "Не удалось загрузить: $($_.Name) - $_"
}
}
return $InstallPath
}
# Установка
$libPath = Install-EventStoreClient -InstallPath "C:\PowerShell\EventStore"
Полная реализация клиента для PowerShell
using namespace EventStore.Client
using namespace System.Threading.Tasks
using namespace Google.Protobuf
class EventStorePowerShellClient {
# Клиентские подключения
hidden [EventStoreClient]$EventStoreClient
hidden [EventStorePersistentSubscriptionsClient]$PersistentSubscriptionsClient
hidden [EventStoreProjectionManagementClient]$ProjectionClient
hidden [string]$ConnectionString
hidden [hashtable]$StreamMetadataCache = @{}
# Конструктор
EventStorePowerShellClient([string]$host = "localhost", [int]$httpPort = 2113, [int]$grpcPort = 1113) {
$this.ConnectionString = "esdb://${host}:${httpPort}?tls=false"
$this.InitializeClients()
}
# Инициализация клиентов
hidden [void]InitializeClients() {
# Настройки подключения
$settings = [EventStoreClientSettings]::Create($this.ConnectionString)
# Создание клиентов
$this.EventStoreClient = [EventStoreClient]::new($settings)
$this.PersistentSubscriptionsClient = [EventStorePersistentSubscriptionsClient]::new($settings)
$this.ProjectionClient = [EventStoreProjectionManagementClient]::new($settings)
Write-Host "Подключено к EventStoreDB: $($this.ConnectionString)" -ForegroundColor Green
}
# ========== ОСНОВНЫЕ ОПЕРАЦИИ СОБЫТИЙ ==========
# Запись события в поток
[void]AppendEvent([string]$streamName, [string]$eventType, [object]$data, [hashtable]$metadata = @{}) {
$eventData = $this.CreateEventData($eventType, $data, $metadata)
$task = $this.EventStoreClient.AppendToStreamAsync(
$streamName,
[StreamState]::Any,
@($eventData)
)
$result = $this.AwaitTask($task)
Write-Host "Событие записано в поток '$streamName', позиция: $($result.NextExpectedStreamRevision)" -ForegroundColor Green
}
# Пакетная запись событий
[void]AppendEvents([string]$streamName, [hashtable[]]$events) {
$eventDataList = [System.Collections.Generic.List[EventData]]::new()
foreach ($event in $events) {
$eventData = $this.CreateEventData($event.Type, $event.Data, $event.Metadata)
$eventDataList.Add($eventData)
}
$task = $this.EventStoreClient.AppendToStreamAsync(
$streamName,
[StreamState]::Any,
$eventDataList
)
$result = $this.AwaitTask($task)
Write-Host "Записано $($events.Count) событий в поток '$streamName'" -ForegroundColor Green
}
# Чтение событий из потока
[PSObject[]]ReadStreamEvents([string]$streamName, [long]$fromPosition = 0, [int]$maxCount = 100) {
$events = [System.Collections.Generic.List[PSObject]]::new()
$task = $this.EventStoreClient.ReadStreamAsync(
[Direction]::Forwards,
$streamName,
[StreamPosition]::new($fromPosition),
$maxCount
)
try {
$result = $this.AwaitTask($task)
foreach ($resolvedEvent in $result) {
$event = $this.ConvertToPowerShellObject($resolvedEvent)
$events.Add($event)
}
} catch [StreamNotFoundException] {
Write-Warning "Поток '$streamName' не найден"
}
return $events.ToArray()
}
# Чтение всех событий из потока
[PSObject[]]ReadAllStreamEvents([string]$streamName) {
$allEvents = [System.Collections.Generic.List[PSObject]]::new()
$position = 0
$batchSize = 500
do {
$events = $this.ReadStreamEvents($streamName, $position, $batchSize)
if ($events.Count -eq 0) { break }
$allEvents.AddRange($events)
$position += $events.Count
Write-Progress -Activity "Чтение событий из '$streamName'" `
-Status "Прочитано: $position" `
-PercentComplete (($position % 1000) / 10)
} while ($events.Count -eq $batchSize)
return $allEvents.ToArray()
}
# Подписка на поток в реальном времени
[void]SubscribeToStream([string]$streamName, [scriptblock]$handler) {
$task = $this.EventStoreClient.SubscribeToStreamAsync(
$streamName,
{
param($subscription, $resolvedEvent, $cancellationToken)
$event = $this.ConvertToPowerShellObject($resolvedEvent)
# Выполнение PowerShell скриптблока
[powershell]::Create().AddScript($handler).AddArgument($event).Invoke()
return [Task]::CompletedTask
}.GetNewClosure(),
$true # resolveLinkTos
)
$this.AwaitTask($task)
Write-Host "Подписка на поток '$streamName' активна" -ForegroundColor Cyan
}
# ========== УПРАВЛЕНИЕ ПОТОКАМИ ==========
# Получение информации о потоке
[hashtable]GetStreamMetadata([string]$streamName) {
if ($this.StreamMetadataCache.ContainsKey($streamName)) {
return $this.StreamMetadataCache[$streamName]
}
$task = $this.EventStoreClient.GetStreamMetadataAsync($streamName)
$metadataResult = $this.AwaitTask($task)
$metadata = @{
StreamName = $streamName
MaxAge = $metadataResult.StreamMetadata.MaxAge?.TotalSeconds
MaxCount = $metadataResult.StreamMetadata.MaxCount
TruncateBefore = $metadataResult.StreamMetadata.TruncateBefore
CacheControl = $metadataResult.StreamMetadata.CacheControl?.ToString()
Acl = $metadataResult.StreamMetadata.Acl
LastEventNumber = $metadataResult.MetastreamRevision
}
$this.StreamMetadataCache[$streamName] = $metadata
return $metadata
}
# Установка метаданных потока
[void]SetStreamMetadata([string]$streamName, [hashtable]$metadata) {
$streamMetadata = [StreamMetadata]::new()
if ($metadata.MaxAge) {
$streamMetadata.SetMaxAge([TimeSpan]::FromSeconds($metadata.MaxAge))
}
if ($metadata.MaxCount) {
$streamMetadata.SetMaxCount($metadata.MaxCount)
}
if ($metadata.TruncateBefore) {
$streamMetadata.SetTruncateBefore($metadata.TruncateBefore)
}
$task = $this.EventStoreClient.SetStreamMetadataAsync(
$streamName,
[StreamState]::Any,
$streamMetadata
)
$this.AwaitTask($task)
Write-Host "Метаданные потока '$streamName' обновлены" -ForegroundColor Yellow
}
# Удаление потока
[void]DeleteStream([string]$streamName, [bool]$hardDelete = $false) {
$task = $this.EventStoreClient.DeleteAsync(
$streamName,
[StreamState]::Any,
[DeleteOptions]::new($hardDelete)
)
$this.AwaitTask($task)
if ($this.StreamMetadataCache.ContainsKey($streamName)) {
$this.StreamMetadataCache.Remove($streamName)
}
Write-Host "Поток '$streamName' удален ($(if ($hardDelete) {'hard'} else {'soft'}))" -ForegroundColor Red
}
# ========== ПРОЕКЦИИ ==========
# Создание проекции
[void]CreateProjection([string]$name, [string]$query) {
$task = $this.ProjectionClient.CreateContinuousAsync(
$name,
$query,
$true, # trackEmittedStreams
[System.Threading.CancellationToken]::None
)
$this.AwaitTask($task)
Write-Host "Проекция '$name' создана" -ForegroundColor Green
}
# Выполнение одноразовой проекции
[string]ExecuteProjection([string]$name, [string]$query) {
$task = $this.ProjectionClient.CreateOneTimeAsync(
$query,
[System.Threading.CancellationToken]::None
)
$result = $this.AwaitTask($task)
return $result
}
# Получение состояния проекции
[PSObject]GetProjectionState([string]$name) {
$task = $this.ProjectionClient.GetStateAsync(
$name,
[System.Threading.CancellationToken]::None
)
$result = $this.AwaitTask($task)
return $result | ConvertFrom-Json
}
# ========== PERSISTENT SUBSCRIPTIONS ==========
# Создание persistent subscription
[void]CreatePersistentSubscription(
[string]$streamName,
[string]$groupName,
[hashtable]$settings = @{}
) {
$subscriptionSettings = [PersistentSubscriptionSettings]::new()
if ($settings.ResolveLinkTos) { $subscriptionSettings.ResolveLinkTos = $settings.ResolveLinkTos }
if ($settings.StartFrom) { $subscriptionSettings.StartFrom = $settings.StartFrom }
if ($settings.MaxRetryCount) { $subscriptionSettings.MaxRetryCount = $settings.MaxRetryCount }
if ($settings.MaxSubscriberCount) { $subscriptionSettings.MaxSubscriberCount = $settings.MaxSubscriberCount }
$task = $this.PersistentSubscriptionsClient.CreateAsync(
$streamName,
$groupName,
$subscriptionSettings
)
$this.AwaitTask($task)
Write-Host "Persistent subscription '$groupName' создана для потока '$streamName'" -ForegroundColor Green
}
# Подписка на persistent subscription
[void]SubscribeToPersistentSubscription(
[string]$streamName,
[string]$groupName,
[scriptblock]$eventHandler,
[scriptblock]$subscriptionDroppedHandler
) {
$task = $this.PersistentSubscriptionsClient.SubscribeAsync(
$streamName,
$groupName,
{
param($subscription, $resolvedEvent, $retryCount, $cancellationToken)
$event = $this.ConvertToPowerShellObject($resolvedEvent)
$event | Add-Member -NotePropertyName "RetryCount" -NotePropertyValue $retryCount -Force
# Обработка события в PowerShell
try {
$result = [powershell]::Create().AddScript($eventHandler).AddArgument($event).Invoke()
if ($result -contains $true) {
$subscription.Ack($resolvedEvent)
} else {
$subscription.Nack([NackAction]::Retry, "PowerShell обработка не подтверждена", $resolvedEvent)
}
} catch {
$subscription.Nack([NackAction]::Retry, $_.Exception.Message, $resolvedEvent)
}
return [Task]::CompletedTask
}.GetNewClosure(),
{
param($subscription, $dropReason, $exception)
$dropInfo = [PSCustomObject]@{
DropReason = $dropReason
Exception = $exception
Timestamp = Get-Date
}
[powershell]::Create().AddScript($subscriptionDroppedHandler).AddArgument($dropInfo).Invoke()
return [Task]::CompletedTask
}.GetNewClosure()
)
$this.AwaitTask($task)
Write-Host "Подписка на persistent subscription '$groupName' активна" -ForegroundColor Cyan
}
# ========== ВСПОМОГАТЕЛЬНЫЕ МЕТОДЫ ==========
# Создание EventData из PowerShell объекта
hidden [EventData]CreateEventData([string]$eventType, [object]$data, [hashtable]$metadata) {
# Сериализация данных
$dataJson = $data | ConvertTo-Json -Depth 10 -Compress
$dataBytes = [System.Text.Encoding]::UTF8.GetBytes($dataJson)
# Сериализация метаданных
$metadataJson = $metadata | ConvertTo-Json -Depth 5 -Compress
$metadataBytes = [System.Text.Encoding]::UTF8.GetBytes($metadataJson)
# Создание EventData с уникальным ID
$eventId = [Guid]::NewGuid()
return [EventData]::new(
[Uuid]::new($eventId),
$eventType,
$dataBytes,
$metadataBytes,
"application/json"
)
}
# Конвертация ResolvedEvent в PowerShell объект
hidden [PSObject]ConvertToPowerShellObject([ResolvedEvent]$resolvedEvent) {
$event = $resolvedEvent.Event
# Десериализация данных
$dataJson = [System.Text.Encoding]::UTF8.GetString($event.Data.ToArray())
$metadataJson = [System.Text.Encoding]::UTF8.GetString($event.Metadata.ToArray())
try {
$data = $dataJson | ConvertFrom-Json
} catch {
$data = $dataJson
}
try {
$metadata = $metadataJson | ConvertFrom-Json
} catch {
$metadata = @{ Raw = $metadataJson }
}
return [PSCustomObject]@{
StreamId = $event.EventStreamId
EventId = $event.EventId.ToString()
EventType = $event.EventType
EventNumber = $event.EventNumber.ToUInt64()
Created = $event.Created
Data = $data
Metadata = $metadata
Position = $resolvedEvent.OriginalEventNumber?.ToUInt64()
IsResolved = $resolvedEvent.OriginalEvent -ne $null
}
}
# Ожидание завершения Task
hidden [object]AwaitTask([Task]$task) {
$task.Wait()
if ($task.IsFaulted) {
throw $task.Exception.InnerException
}
if ($task.GetType().IsGenericType) {
$resultProperty = $task.GetType().GetProperty("Result")
return $resultProperty.GetValue($task)
}
return $null
}
# Деструктор
[void]Dispose() {
if ($this.EventStoreClient -ne $null) {
$this.EventStoreClient.Dispose()
}
if ($this.PersistentSubscriptionsClient -ne $null) {
$this.PersistentSubscriptionsClient.Dispose()
}
if ($this.ProjectionClient -ne $null) {
$this.ProjectionClient.Dispose()
}
Write-Host "Клиент EventStoreDB отключен" -ForegroundColor Yellow
}
}
Примеры использования Event Sourcing
Пример 1: Банковский аккаунт с Event Sourcing
# Определяем события
enum BankAccountEvents {
AccountOpened
MoneyDeposited
MoneyWithdrawn
AccountClosed
}
class BankAccountAggregate {
[string]$AccountId
[string]$Owner
[decimal]$Balance
[bool]$IsClosed
[int]$Version
# Применение событий
[void]Apply([PSObject]$event) {
$this.Version++
switch ($event.EventType) {
([BankAccountEvents]::AccountOpened.ToString()) {
$this.AccountId = $event.Data.AccountId
$this.Owner = $event.Data.Owner
$this.Balance = 0
$this.IsClosed = $false
break
}
([BankAccountEvents]::MoneyDeposited.ToString()) {
$this.Balance += $event.Data.Amount
break
}
([BankAccountEvents]::MoneyWithdrawn.ToString()) {
if ($this.Balance - $event.Data.Amount -lt 0) {
throw "Недостаточно средств"
}
$this.Balance -= $event.Data.Amount
break
}
([BankAccountEvents]::AccountClosed.ToString()) {
$this.IsClosed = $true
break
}
}
}
# Восстановление состояния из событий
static [BankAccountAggregate]LoadFromEvents([PSObject[]]$events) {
$aggregate = [BankAccountAggregate]::new()
foreach ($event in $events) {
$aggregate.Apply($event)
}
return $aggregate
}
}
# Репозиторий для агрегата
class BankAccountRepository {
[EventStorePowerShellClient]$EventStore
[string]$StreamPrefix = "bankaccount"
BankAccountRepository([EventStorePowerShellClient]$eventStore) {
$this.EventStore = $eventStore
}
[BankAccountAggregate]Load([string]$accountId) {
$streamName = "$($this.StreamPrefix)-$accountId"
$events = $this.EventStore.ReadAllStreamEvents($streamName)
if ($events.Count -eq 0) {
return $null
}
return [BankAccountAggregate]::LoadFromEvents($events)
}
[void]Save([BankAccountAggregate]$aggregate, [hashtable[]]$events) {
$streamName = "$($this.StreamPrefix)-$($aggregate.AccountId)"
$eventData = @()
foreach ($event in $events) {
$eventData += @{
Type = $event.Type
Data = $event.Data
Metadata = @{
AggregateId = $aggregate.AccountId
AggregateType = "BankAccount"
Version = $aggregate.Version
Timestamp = Get-Date -Format "o"
}
}
}
$this.EventStore.AppendEvents($streamName, $eventData)
}
}
# Команды и обработчики
class BankAccountCommandHandler {
[BankAccountRepository]$Repository
BankAccountCommandHandler([EventStorePowerShellClient]$eventStore) {
$this.Repository = [BankAccountRepository]::new($eventStore)
}
[void]HandleOpenAccount([hashtable]$command) {
$accountId = [Guid]::NewGuid().ToString()
# Создаем события
$events = @(
@{
Type = [BankAccountEvents]::AccountOpened.ToString()
Data = @{
AccountId = $accountId
Owner = $command.Owner
InitialBalance = $command.InitialBalance
CreatedAt = Get-Date -Format "o"
}
}
)
if ($command.InitialBalance -gt 0) {
$events += @{
Type = [BankAccountEvents]::MoneyDeposited.ToString()
Data = @{
Amount = $command.InitialBalance
Description = "Initial deposit"
}
}
}
# Сохраняем события
$aggregate = [BankAccountAggregate]::new()
$aggregate.AccountId = $accountId
$this.Repository.Save($aggregate, $events)
Write-Host "Счет открыт: $accountId" -ForegroundColor Green
}
[void]HandleDeposit([hashtable]$command) {
$account = $this.Repository.Load($command.AccountId)
if ($null -eq $account) {
throw "Счет не найден"
}
if ($account.IsClosed) {
throw "Счет закрыт"
}
$events = @(
@{
Type = [BankAccountEvents]::MoneyDeposited.ToString()
Data = @{
Amount = $command.Amount
Description = $command.Description
Reference = $command.Reference
}
}
)
$this.Repository.Save($account, $events)
Write-Host "Депозит выполнен: $($command.Amount)" -ForegroundColor Green
}
[hashtable]GetAccountState([string]$accountId) {
$account = $this.Repository.Load($accountId)
if ($null -eq $account) {
return $null
}
return @{
AccountId = $account.AccountId
Owner = $account.Owner
Balance = $account.Balance
IsClosed = $account.IsClosed
Version = $account.Version
}
}
}
# Использование
try {
$eventStore = [EventStorePowerShellClient]::new()
$handler = [BankAccountCommandHandler]::new($eventStore)
# Открытие счета
$handler.HandleOpenAccount(@{
Owner = "Иван Иванов"
InitialBalance = 1000
})
# Получение первого счета (в реальности нужно знать ID)
# Для демо создадим еще один
$handler.HandleOpenAccount(@{
Owner = "Петр Петров"
InitialBalance = 500
})
} finally {
if ($eventStore -ne $null) {
$eventStore.Dispose()
}
}
Пример 2: Система заказов (e-commerce)
enum OrderEvents {
OrderCreated
OrderItemAdded
OrderItemRemoved
OrderQuantityChanged
OrderShipped
OrderCancelled
PaymentReceived
}
class OrderProjection {
[hashtable]$Orders = @{}
[hashtable]$CustomerOrders = @{}
[hashtable]$DailyStats = @{}
# Обработка событий для проекции
[void]HandleEvent([PSObject]$event) {
switch ($event.EventType) {
([OrderEvents]::OrderCreated.ToString()) {
$orderId = $event.Data.OrderId
$customerId = $event.Data.CustomerId
$this.Orders[$orderId] = @{
OrderId = $orderId
CustomerId = $customerId
Status = "Created"
Items = @()
TotalAmount = 0
CreatedAt = $event.Data.CreatedAt
}
# Индекс по клиенту
if (-not $this.CustomerOrders.ContainsKey($customerId)) {
$this.CustomerOrders[$customerId] = [System.Collections.Generic.List[string]]::new()
}
$this.CustomerOrders[$customerId].Add($orderId)
# Статистика по дням
$date = ([DateTime]$event.Data.CreatedAt).Date.ToString("yyyy-MM-dd")
if (-not $this.DailyStats.ContainsKey($date)) {
$this.DailyStats[$date] = @{ OrderCount = 0; TotalAmount = 0 }
}
$this.DailyStats[$date].OrderCount++
break
}
([OrderEvents]::OrderItemAdded.ToString()) {
$orderId = $event.Data.OrderId
$item = $event.Data.Item
if ($this.Orders.ContainsKey($orderId)) {
$this.Orders[$orderId].Items += $item
$this.Orders[$orderId].TotalAmount += $item.Price * $item.Quantity
}
break
}
([OrderEvents]::OrderShipped.ToString()) {
$orderId = $event.Data.OrderId
if ($this.Orders.ContainsKey($orderId)) {
$this.Orders[$orderId].Status = "Shipped"
$this.Orders[$orderId].ShippedAt = $event.Data.ShippedAt
}
break
}
}
}
# Получение заказа по ID
[hashtable]GetOrder([string]$orderId) {
return $this.Orders[$orderId]
}
# Получение заказов клиента
[hashtable[]]GetCustomerOrders([string]$customerId) {
$orders = @()
if ($this.CustomerOrders.ContainsKey($customerId)) {
foreach ($orderId in $this.CustomerOrders[$customerId]) {
$orders += $this.Orders[$orderId]
}
}
return $orders
}
}
# Создание проекции в EventStoreDB
function Initialize-OrderProjection {
param(
[EventStorePowerShellClient]$EventStore
)
$projectionQuery = @"
fromStream('orders')
.when({
OrderCreated: function(s, e) {
linkTo('customer-' + e.data.customerId, e);
emit('order-stats', 'OrderCreated', e.data);
},
OrderItemAdded: function(s, e) {
emit('order-stats', 'OrderItemAdded', e.data);
},
OrderShipped: function(s, e) {
linkTo('shipped-orders', e);
emit('order-stats', 'OrderShipped', e.data);
}
})
"@
$EventStore.CreateProjection("order-processing", $projectionQuery)
}
# Подписка на события заказов
function Subscribe-ToOrderEvents {
param(
[EventStorePowerShellClient]$EventStore,
[OrderProjection]$Projection
)
$EventStore.SubscribeToStream("orders", {
param($event)
$Projection.HandleEvent($event)
# Дополнительная обработка
switch ($event.EventType) {
([OrderEvents]::OrderCreated.ToString()) {
Write-Host "Новый заказ создан: $($event.Data.OrderId)" -ForegroundColor Green
break
}
([OrderEvents]::OrderShipped.ToString()) {
Write-Host "Заказ отправлен: $($event.Data.OrderId)" -ForegroundColor Cyan
break
}
}
})
}
Пример 3: Аудит и логгирование
class AuditLogSystem {
[EventStorePowerShellClient]$EventStore
[string]$AuditStream = "system-audit"
AuditLogSystem([EventStorePowerShellClient]$eventStore) {
$this.EventStore = $eventStore
}
# Логирование действия
[void]LogAction([string]$userId, [string]$action, [string]$resourceType, [string]$resourceId, [hashtable]$details) {
$eventData = @{
UserId = $userId
Action = $action
ResourceType = $resourceType
ResourceId = $resourceId
Details = $details
Timestamp = Get-Date -Format "o"
IpAddress = (Get-NetIPAddress -AddressFamily IPv4 | Where-Object {$_.InterfaceAlias -like "*Ethernet*"}).IPAddress
UserAgent = "PowerShell"
}
$this.EventStore.AppendEvent(
$this.AuditStream,
"AuditActionPerformed",
$eventData,
@{
Severity = "Info"
System = "PowerShell"
}
)
}
# Поиск в аудит-логе
[PSObject[]]SearchAuditLog(
[string]$userId = $null,
[string]$action = $null,
[datetime]$from = $null,
[datetime]$to = $null
) {
$allEvents = $this.EventStore.ReadAllStreamEvents($this.AuditStream)
$filtered = $allEvents | Where-Object {
($null -eq $userId -or $_.Data.UserId -eq $userId) -and
($null -eq $action -or $_.Data.Action -eq $action) -and
($null -eq $from -or [DateTime]$_.Data.Timestamp -ge $from) -and
($null -eq $to -or [DateTime]$_.Data.Timestamp -le $to)
}
return $filtered
}
# Создание отчетов
[hashtable]GenerateAuditReport([datetime]$date) {
$events = $this.SearchAuditLog($null, $null, $date.Date, $date.Date.AddDays(1))
$report = @{
Date = $date.ToString("yyyy-MM-dd")
TotalActions = $events.Count
ActionsByUser = @{}
ActionsByType = @{}
HourlyDistribution = @{}
}
foreach ($event in $events) {
# Статистика по пользователям
$user = $event.Data.UserId
if (-not $report.ActionsByUser.ContainsKey($user)) {
$report.ActionsByUser[$user] = 0
}
$report.ActionsByUser[$user]++
# Статистика по типам действий
$action = $event.Data.Action
if (-not $report.ActionsByType.ContainsKey($action)) {
$report.ActionsByType[$action] = 0
}
$report.ActionsByType[$action]++
# Распределение по часам
$hour = ([DateTime]$event.Data.Timestamp).Hour
if (-not $report.HourlyDistribution.ContainsKey($hour)) {
$report.HourlyDistribution[$hour] = 0
}
$report.HourlyDistribution[$hour]++
}
return $report
}
}
# Использование аудит-системы
try {
$eventStore = [EventStorePowerShellClient]::new()
$auditSystem = [AuditLogSystem]::new($eventStore)
# Логирование действий
$auditSystem.LogAction("user123", "Create", "Document", "doc456", @{
FileName = "report.pdf"
Size = "2.5MB"
Location = "\\server\share"
})
$auditSystem.LogAction("user123", "Modify", "User", "user789", @{
Field = "Permissions"
OldValue = "Read"
NewValue = "ReadWrite"
})
# Генерация отчета
$report = $auditSystem.GenerateAuditReport((Get-Date))
$report | ConvertTo-Json -Depth 3 | Out-File "audit-report.json"
# Подписка на новые аудит-события
$eventStore.SubscribeToStream("system-audit", {
param($event)
$logMessage = "[$($event.Data.Timestamp)] $($event.Data.UserId) выполнил $($event.Data.Action) на $($event.Data.ResourceType):$($event.Data.ResourceId)"
Write-Host $logMessage -ForegroundColor Gray
# Можно отправлять в SIEM систему
# Invoke-RestMethod -Uri "https://siem.example.com/logs" -Method Post -Body ($event | ConvertTo-Json)
})
} finally {
if ($eventStore -ne $null) {
$eventStore.Dispose()
}
}
Пример 4: Обработка IoT данных
class IoTSensorProcessor {
[EventStorePowerShellClient]$EventStore
[string]$SensorStreamPrefix = "sensor"
[hashtable]$SensorCache = @{}
IoTSensorProcessor([EventStorePowerShellClient]$eventStore) {
$this.EventStore = $eventStore
}
# Запись показаний с датчика
[void]RecordSensorReading([string]$sensorId, [hashtable]$reading) {
$streamName = "$($this.SensorStreamPrefix)-$sensorId"
$eventData = @{
SensorId = $sensorId
Timestamp = Get-Date -Format "o"
Reading = $reading
Location = $this.SensorCache[$sensorId]?.Location
SensorType = $this.SensorCache[$sensorId]?.Type
}
$this.EventStore.AppendEvent(
$streamName,
"SensorReadingRecorded",
$eventData,
@{
DataVersion = "1.0"
Source = "IoT-Gateway"
}
)
# Обновление кэша
if (-not $this.SensorCache.ContainsKey($sensorId)) {
$this.SensorCache[$sensorId] = @{
LastReading = $reading
LastUpdated = Get-Date
ReadingCount = 0
}
}
$this.SensorCache[$sensorId].LastReading = $reading
$this.SensorCache[$sensorId].LastUpdated = Get-Date
$this.SensorCache[$sensorId].ReadingCount++
}
# Создание проекции для агрегации данных
[void]CreateSensorAggregationProjection() {
$projectionQuery = @"
fromCategory('sensor')
.when({
SensorReadingRecorded: function(state, event) {
var sensorId = event.data.sensorId;
var reading = event.data.reading;
var date = new Date(event.data.timestamp).toISOString().split('T')[0];
// Агрегация по дате и датчику
var key = sensorId + ':' + date;
if (!state[key]) {
state[key] = {
sensorId: sensorId,
date: date,
count: 0,
sum: {},
min: {},
max: {},
avg: {}
};
}
var agg = state[key];
agg.count++;
// Обработка каждого поля в reading
for (var field in reading) {
var value = parseFloat(reading[field]);
if (!agg.sum[field]) agg.sum[field] = 0;
if (!agg.min[field]) agg.min[field] = value;
if (!agg.max[field]) agg.max[field] = value;
agg.sum[field] += value;
agg.min[field] = Math.min(agg.min[field], value);
agg.max[field] = Math.max(agg.max[field], value);
agg.avg[field] = agg.sum[field] / agg.count;
}
return state;
}
})
.emit();
"@
$this.EventStore.CreateProjection("sensor-aggregations", $projectionQuery)
}
# Подписка на аномалии
[void]SubscribeToSensorAnomalies([string]$sensorId, [scriptblock]$anomalyHandler) {
$streamName = "$($this.SensorStreamPrefix)-$sensorId"
$this.EventStore.SubscribeToStream($streamName, {
param($event)
if ($event.EventType -eq "SensorReadingRecorded") {
$reading = $event.Data.Reading
# Проверка на аномалии (пример: температура вне диапазона)
if ($reading.ContainsKey("temperature")) {
$temp = $reading.temperature
if ($temp -lt -20 -or $temp -gt 50) {
$anomalyData = @{
SensorId = $sensorId
Timestamp = $event.Data.Timestamp
Metric = "temperature"
Value = $temp
Threshold = "[-20, 50]"
Severity = if ($temp -lt -30 -or $temp -gt 60) { "Critical" } else { "Warning" }
}
# Вызов обработчика аномалий
[powershell]::Create().AddScript($anomalyHandler).AddArgument($anomalyData).Invoke()
# Логирование аномалии
$this.EventStore.AppendEvent(
"sensor-anomalies",
"SensorAnomalyDetected",
$anomalyData,
@{ AlertSent = $true }
)
}
}
}
})
}
}
# Использование IoT процессора
try {
$eventStore = [EventStorePowerShellClient]::new()
$iotProcessor = [IoTSensorProcessor]::new($eventStore)
# Регистрация датчиков
1..10 | ForEach-Object {
$sensorId = "sensor-$_"
$iotProcessor.SensorCache[$sensorId] = @{
Location = "Room $_"
Type = if ($_ % 2 -eq 0) { "Temperature" } else { "Humidity" }
}
}
# Имитация показаний датчиков
$simulationJob = Start-Job -ScriptBlock {
param($eventStore)
$iotProcessor = [IoTSensorProcessor]::new($eventStore)
$random = [Random]::new()
while ($true) {
1..10 | ForEach-Object {
$sensorId = "sensor-$_"
$reading = if ($_ % 2 -eq 0) {
@{ temperature = $random.Next(15, 35) }
} else {
@{ humidity = $random.Next(30, 80) }
}
$iotProcessor.RecordSensorReading($sensorId, $reading)
}
Start-Sleep -Seconds 5
}
} -ArgumentList $eventStore
# Создание проекции для агрегации
$iotProcessor.CreateSensorAggregationProjection()
# Подписка на аномалии
$iotProcessor.SubscribeToSensorAnomalies("sensor-1", {
param($anomaly)
Write-Host "АНОМАЛИЯ: $($anomaly.SensorId) - $($anomaly.Metric)=$($anomaly.Value)" -ForegroundColor Red
# Можно отправить email или SMS
})
# Мониторинг в реальном времени
Write-Host "Мониторинг IoT датчиков запущен. Нажмите Ctrl+C для остановки." -ForegroundColor Cyan
while ($true) {
$aggregation = $eventStore.GetProjectionState("sensor-aggregations")
if ($aggregation) {
$stats = $aggregation | Get-Member -MemberType NoteProperty |
Select-Object -ExpandProperty Name | ForEach-Object {
$key = $_
$data = $aggregation.$key
[PSCustomObject]@{
SensorId = $data.sensorId
Date = $data.date
Count = $data.count
Temperature = if ($data.avg.temperature) { $data.avg.temperature } else { "N/A" }
Humidity = if ($data.avg.humidity) { $data.avg.humidity } else { "N/A" }
}
}
Clear-Host
$stats | Format-Table -AutoSize
}
Start-Sleep -Seconds 10
}
} finally {
if ($simulationJob) { Stop-Job -Job $simulationJob }
if ($eventStore -ne $null) { $eventStore.Dispose() }
}
Производительность и масштабирование
Бенчмарк EventStoreDB
function Test-EventStorePerformance {
param(
[int]$EventCount = 10000,
[string]$StreamPrefix = "perftest"
)
$eventStore = [EventStorePowerShellClient]::new()
try {
# Тест записи
$sw = [System.Diagnostics.Stopwatch]::StartNew()
1..$EventCount | ForEach-Object {
$streamName = "$StreamPrefix-$([Math]::Floor($_ / 1000))"
$eventStore.AppendEvent(
$streamName,
"TestEvent",
@{
Id = $_
Data = "x" * 1024 # 1KB данных
Timestamp = Get-Date -Format "o"
},
@{ Batch = [Math]::Floor($_ / 100) }
)
if ($_ % 1000 -eq 0) {
Write-Progress -Activity "Запись событий" -Status "$_ / $EventCount"
}
}
$sw.Stop()
$writePerf = [Math]::Round($EventCount / $sw.Elapsed.TotalSeconds, 0)
# Тест чтения
$sw.Restart()
$totalRead = 0
for ($i = 0; $i -lt 10; $i++) {
$streamName = "$StreamPrefix-$i"
$events = $eventStore.ReadAllStreamEvents($streamName)
$totalRead += $events.Count
}
$sw.Stop()
$readPerf = [Math]::Round($totalRead / $sw.Elapsed.TotalSeconds, 0)
# Тест подписки
$subscriptionEvents = 0
$sw.Restart()
$subscription = $eventStore.SubscribeToStream("$StreamPrefix-0", {
param($event)
$script:subscriptionEvents++
})
Start-Sleep -Seconds 5
# Добавим еще событий для триггера подписки
1..100 | ForEach-Object {
$eventStore.AppendEvent(
"$StreamPrefix-0",
"SubscriptionTest",
@{ Id = $_ }
)
}
Start-Sleep -Seconds 2
$sw.Stop()
$subscriptionPerf = [Math]::Round($script:subscriptionEvents / 7, 0) # За 7 секунд
return @{
WritePerformance = "$writePerf событий/сек"
ReadPerformance = "$readPerf событий/сек"
SubscriptionPerformance = "$subscriptionPerf событий/сек"
TotalEvents = $EventCount
TotalTime = $sw.Elapsed.TotalSeconds
}
} finally {
$eventStore.Dispose()
}
}
# Запуск бенчмарка
$results = Test-EventStorePerformance -EventCount 5000
$results | Format-List
Мониторинг и администрирование
EventStoreDB UI и API
# Доступ к веб-интерфейсу
Start-Process "http://localhost:2113"
# Получение статистики через REST API
function Get-EventStoreStats {
$stats = Invoke-RestMethod -Uri "http://localhost:2113/stats" -UseBasicParsing
return $stats | ConvertTo-Json -Depth 10
}
# Мониторинг через PowerShell
class EventStoreMonitor {
[string]$ApiUrl = "http://localhost:2113"
[hashtable]GetHealth() {
try {
$response = Invoke-WebRequest -Uri "$($this.ApiUrl)/health/live" -UseBasicParsing
return @{ Status = "Healthy"; Code = $response.StatusCode }
} catch {
return @{ Status = "Unhealthy"; Error = $_.Exception.Message }
}
}
[hashtable]GetMetrics() {
$stats = Invoke-RestMethod -Uri "$($this.ApiUrl)/stats" -UseBasicParsing
return @{
Events = @{
Total = $stats.'es.events'
PerSecond = $stats.'es.eventsPerSecond'
}
Streams = @{
Total = $stats.'es.streams'
}
Connections = @{
Active = $stats.'es.connections'
}
Memory = @{
Used = [Math]::Round($stats.'proc.mem' / 1MB, 2)
Peak = [Math]::Round($stats.'proc.memPeak' / 1MB, 2)
}
Disk = @{
Used = [Math]::Round($stats.'es.diskspace-used' / 1GB, 2)
Free = [Math]::Round($stats.'es.diskspace-free' / 1GB, 2)
}
}
}
[void]CreateAlert([string]$metric, [string]$operator, [int]$threshold) {
$alertScript = @"
while (`$true) {
`$metrics = Get-EventStoreStats
`$value = `$metrics.'$metric'
if ($operator `$value $threshold) {
Write-Host "ALERT: $metric $operator $threshold (Current: `$value)" -ForegroundColor Red
# Можно добавить отправку email, SMS и т.д.
}
Start-Sleep -Seconds 60
}
"@
Start-Job -ScriptBlock ([scriptblock]::Create($alertScript))
}
}
# Использование монитора
$monitor = [EventStoreMonitor]::new()
$health = $monitor.GetHealth()
$metrics = $monitor.GetMetrics()
Write-Host "Статус: $($health.Status)" -ForegroundColor $(if ($health.Status -eq "Healthy") { "Green" } else { "Red" })
$metrics | ConvertTo-Json -Depth 3 | Out-File "eventstore-metrics.json"
Резервное копирование и восстановление
# Резервное копирование EventStoreDB
function Backup-EventStoreDB {
param(
[string]$BackupPath = "C:\Backups\EventStoreDB",
[string]$EventStorePath = "C:\Data\EventStoreDB"
)
$timestamp = Get-Date -Format "yyyyMMdd-HHmmss"
$backupDir = Join-Path $BackupPath $timestamp
New-Item -ItemType Directory -Path $backupDir -Force
# Копирование файлов базы данных
Copy-Item -Path "$EventStorePath\*" -Destination $backupDir -Recurse -Force
# Создание снапшота через API
Invoke-RestMethod -Uri "http://localhost:2113/admin/shutdown" -Method Post -Body "ACTION=CREATE-SNAPSHOT"
Write-Host "Резервная копия создана: $backupDir" -ForegroundColor Green
return $backupDir
}
# Восстановление из резервной копии
function Restore-EventStoreDB {
param(
[string]$BackupDir,
[string]$EventStorePath = "C:\Data\EventStoreDB"
)
# Остановка EventStoreDB
Stop-Service -Name "EventStoreDB" -Force
# Очистка целевого каталога
Remove-Item -Path "$EventStorePath\*" -Recurse -Force -ErrorAction SilentlyContinue
# Восстановление из резервной копии
Copy-Item -Path "$BackupDir\*" -Destination $EventStorePath -Recurse -Force
# Запуск EventStoreDB
Start-Service -Name "EventStoreDB"
Write-Host "Восстановление завершено из: $BackupDir" -ForegroundColor Green
}
Сравнение с другими решениями
| Характеристика | EventStoreDB | SQL Server | MongoDB | Kafka |
|---|---|---|---|---|
| Тип | Event Store | Relational DB | Document DB | Message Broker |
| Event Sourcing | Нативная поддержка | Требуется кастомная реализация | Требуется кастомная реализация | Частичная поддержка |
| Производительность | 50,000+ событий/сек | 10,000+ событий/сек | 20,000+ событий/сек | 100,000+ сообщений/сек |
| Хранение событий | Append-only журнал | Таблицы событий | Коллекции документов | Топики |
| Подписки | Реализованы | Нет | Change Streams | Consumer Groups |
| Проекции | Встроенный движок | SQL Views | Агрегации | KSQL/Streams |
| Масштабируемость | Кластер | Кластер | Кластер | Кластер |
| Сложность | Средняя | Высокая | Средняя | Высокая |
Плюсы EventStoreDB для Event Sourcing
Преимущества:
- Специализация для Event Sourcing:
- Оптимизирована для записи событий
- Гарантированный порядок событий
- Поддержка optimistic concurrency
- Богатый функционал:
- Проекции для материализованных представлений
- Persistent subscriptions для надежной доставки
- Поддержка $all потоков
- Производительность:
- Высокая скорость записи событий
- Эффективное хранение во временных рядах
- Поддержка кластеризации
- Надёжность:
- ACID-гарантии для записей
- Восстановление после сбоев
- Репликация данных
- Экосистема:
- Клиенты для .NET, Java, JavaScript
- Интеграция с Grafana, Prometheus
- Поддержка gRPC и HTTP API
Минусы и ограничения
Недостатки:
- Требует сервер:
- Не встраиваемая библиотека
- Требует установки и обслуживания
- Потребляет ресурсы (RAM, CPU)
- Сложность настройки:
- Много параметров конфигурации
- Требуется понимание кластеризации
- SSL/TLS настройка
- Ограниченные запросы:
- Нет SQL запросов
- Проекции на JavaScript
- Сложные фильтры требуют проекций
- Кривая обучения:
- Требует понимания Event Sourcing
- Новые концепции (потоки, проекции)
- Отладка проекций
- Экосистема .NET:
- Меньше документации для PowerShell
- Требует .NET 5+ для последних версий
- Зависимости от gRPC
Рекомендации по использованию
Когда использовать EventStoreDB:
✅ Используйте EventStoreDB если:
- Строите систему на основе Event Sourcing
- Нужна полная история изменений
- Требуется аудит всех действий
- Реализуете CQRS архитектуру
- Работаете с временными рядами данных
❌ Не используйте EventStoreDB если:
- Нужна простая key-value база
- Проект маленький без требований к истории
- Нет возможности поддерживать сервер
- Требуются сложные SQL запросы
Лучшие практики для PowerShell:
- Используйте обёртки для упрощения работы
- Реализуйте агрегаты для бизнес-логики
- Создавайте проекции для запросов
- Настройте подписки для обработки в реальном времени
- Мониторьте метрики для производительности
Альтернативы для простых сценариев:
# Для простого event sourcing можно использовать:
# 1. SQLite с таблицей событий
# 2. Redis Streams
# 3. Azure Event Hubs
# 4. Apache Kafka
Заключение
EventStoreDB — это профессиональное решение для Event Sourcing, которое предоставляет:
Идеально подходит для:
- Систем с полным аудитом изменений
- Приложений с временными рядами данных
- Реализации CQRS архитектуры
- Систем, где важна история изменений
Требует:
- Понимания принципов Event Sourcing
- Настройки и обслуживания сервера
- Реализации проекций для запросов
Для PowerShell разработчиков, EventStoreDB предоставляет мощный инструмент для построения надежных систем с полной историей изменений, но требует инвестиций в изучение концепций и настройку инфраструктуры.