Files
Masterarbeit/Versuche/Versuch 02/Ergenisse/software/Integration_Patterns.md

1248 lines
43 KiB
Markdown

# Integration Patterns and Communication - Centron Enterprise Application
## Overview
This document provides comprehensive coverage of integration patterns and implementations within the Centron .NET 8 enterprise application. Integration patterns encompass external API integration, data transformation, error handling, retry mechanisms, circuit breakers, fault tolerance, message patterns, and service orchestration that enable seamless communication between systems.
## Integration Architecture
### **Multi-Channel Integration Strategy**
The Centron application supports multiple integration channels and patterns:
1. **REST API Integration**: HTTP-based services for modern web APIs
2. **SOAP Web Services**: Legacy system integration and enterprise services
3. **File-based Integration**: EDI, CSV, and XML file processing
4. **Database Integration**: Direct database connections and synchronization
5. **Message Queue Integration**: Asynchronous messaging and event processing
6. **Email Integration**: SMTP and email processing services
## External API Integration Patterns
### **1. REST Client Base Pattern**
The foundation for all external REST API integrations:
```csharp
public abstract class RestClientBase
{
protected readonly HttpClient _httpClient;
protected readonly ILogger _logger;
protected readonly RestClientCredentials _credentials;
protected AccessToken _accessToken;
protected RestClientBase(string baseUrl, RestClientCredentials credentials)
{
_credentials = credentials ?? throw new ArgumentNullException(nameof(credentials));
_logger = LogManager.GetCurrentClassLogger();
_httpClient = new HttpClient(new HttpClientHandler()
{
ServerCertificateCustomValidationCallback = ValidateCertificate
})
{
BaseAddress = new Uri(baseUrl),
Timeout = TimeSpan.FromSeconds(30)
};
// Set default headers
_httpClient.DefaultRequestHeaders.Accept.Clear();
_httpClient.DefaultRequestHeaders.Accept.Add(
new MediaTypeWithQualityHeaderValue("application/json"));
_httpClient.DefaultRequestHeaders.Add("User-Agent", "CentronClient/1.0");
}
protected async Task<Result<T>> SendRequestAsync<T>(
HttpRequestMessage request,
CancellationToken cancellationToken = default) where T : class
{
try
{
var response = await _httpClient.SendAsync(request, cancellationToken);
return await ProcessResponseAsync<T>(response);
}
catch (HttpRequestException ex)
{
_logger.Error(ex, "HTTP request failed: {Method} {Url}", request.Method, request.RequestUri);
return Result<T>.AsError($"HTTP request failed: {ex.Message}");
}
catch (TaskCanceledException ex) when (ex.InnerException is TimeoutException)
{
_logger.Error(ex, "Request timeout: {Method} {Url}", request.Method, request.RequestUri);
return Result<T>.AsError("Request timeout");
}
catch (Exception ex)
{
_logger.Error(ex, "Unexpected error during request: {Method} {Url}", request.Method, request.RequestUri);
return Result<T>.AsError($"Request failed: {ex.Message}");
}
}
protected async Task<Result<T>> ProcessResponseAsync<T>(HttpResponseMessage response) where T : class
{
var responseContent = await response.Content.ReadAsStringAsync();
if (!response.IsSuccessStatusCode)
{
var error = ParseErrorResponse(responseContent, response.StatusCode);
_logger.Error("API request failed: {StatusCode} - {Error}", response.StatusCode, error);
return Result<T>.AsError($"API Error ({response.StatusCode}): {error}");
}
try
{
if (typeof(T) == typeof(string))
{
return Result<T>.AsSuccess(responseContent as T);
}
var deserializedObject = JsonSerializer.Deserialize<T>(responseContent, new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
return Result<T>.AsSuccess(deserializedObject);
}
catch (JsonException ex)
{
_logger.Error(ex, "Failed to deserialize response: {Content}", responseContent);
return Result<T>.AsError($"Response deserialization failed: {ex.Message}");
}
}
protected async Task<Result<AccessToken>> GetJsonWebTokenAsync(string tokenEndpoint)
{
try
{
var tokenRequest = CreateTokenRequest();
var response = await _httpClient.SendAsync(tokenRequest);
if (!response.IsSuccessStatusCode)
{
var error = await response.Content.ReadAsStringAsync();
return Result<AccessToken>.AsError($"Token request failed: {error}");
}
var tokenResponse = await response.Content.ReadAsStringAsync();
var token = JsonSerializer.Deserialize<AccessToken>(tokenResponse);
return Result<AccessToken>.AsSuccess(token);
}
catch (Exception ex)
{
_logger.Error(ex, "Token acquisition failed");
return Result<AccessToken>.AsError($"Token acquisition failed: {ex.Message}");
}
}
private HttpRequestMessage CreateTokenRequest()
{
var request = new HttpRequestMessage(HttpMethod.Post, "/oauth/token");
var parameters = new Dictionary<string, string>
{
{"grant_type", "client_credentials"},
{"client_id", _credentials.ClientId},
{"client_secret", _credentials.ClientSecret}
};
if (!string.IsNullOrEmpty(_credentials.Scope))
parameters["scope"] = _credentials.Scope;
request.Content = new FormUrlEncodedContent(parameters);
return request;
}
private bool ValidateCertificate(HttpRequestMessage request, X509Certificate2 certificate, X509Chain chain, SslPolicyErrors errors)
{
// Implement certificate validation logic based on environment
if (_credentials.AllowInvalidCertificates)
return true;
return errors == SslPolicyErrors.None;
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_httpClient?.Dispose();
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
// Specialized external API client implementation
public class FinApiClient : RestClientBase, IFinApiClient
{
private readonly IOnlineBankingConnectionDialog _dialogHandler;
private readonly bool _sandBoxMode;
public FinApiClient(RestClientCredentials clientCredentials, bool sandBoxMode = false, IOnlineBankingConnectionDialog dialogHandler = null)
: base(sandBoxMode ? FinApiConstants.SandboxAccessApiUrl : FinApiConstants.LiveAccessApiUrl, clientCredentials)
{
_dialogHandler = dialogHandler;
_sandBoxMode = sandBoxMode;
}
public async Task<Result<User>> GetUserAccountAsync()
{
if (_accessToken == null || _accessToken.IsExpired())
return Result<User>.AsError("No valid access token!");
_dialogHandler?.DisplayInfoMessage("Loading account data for the authorized user...");
var request = new HttpRequestMessage(HttpMethod.Get, FinApiConstants.UserAccountPath);
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _accessToken.Token);
var response = await SendRequestAsync<User>(request);
_dialogHandler?.DisplayInfoMessage($"Response: {response.Status}");
return response;
}
public async Task<Result> CreateUserAccountAsync(CreateUserAccountRequest createRequest)
{
try
{
// Get client token for user creation
var tokenResult = await GetJsonWebTokenAsync(FinApiConstants.TokenPath);
if (tokenResult.Status != ResultStatus.Success)
return Result.FromResult(tokenResult);
_dialogHandler?.DisplayInfoMessage("Creating user account...");
var request = new HttpRequestMessage(HttpMethod.Post, FinApiConstants.UserAccountPath);
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", tokenResult.Data.Token);
request.Content = new StringContent(
JsonSerializer.Serialize(createRequest),
Encoding.UTF8,
"application/json");
var response = await _httpClient.SendAsync(request);
var responseContent = await response.Content.ReadAsStringAsync();
if (response.StatusCode != HttpStatusCode.Created)
{
return Result.AsError($"User creation failed: {responseContent}");
}
_dialogHandler?.DisplayInfoMessage("User account created successfully");
return Result.AsSuccess();
}
catch (Exception ex)
{
return Result.AsError($"User account creation failed: {ex.Message}");
}
}
}
```
### **2. Circuit Breaker Pattern for External Services**
```csharp
public class CircuitBreaker
{
private readonly string _name;
private readonly int _failureThreshold;
private readonly TimeSpan _timeout;
private readonly TimeSpan _recoveryTimeout;
private readonly ILogger _logger;
private CircuitBreakerState _state = CircuitBreakerState.Closed;
private int _failureCount = 0;
private DateTime _lastFailureTime = DateTime.MinValue;
private readonly object _lock = new object();
public CircuitBreaker(string name, int failureThreshold, TimeSpan timeout, TimeSpan recoveryTimeout)
{
_name = name;
_failureThreshold = failureThreshold;
_timeout = timeout;
_recoveryTimeout = recoveryTimeout;
_logger = LogManager.GetCurrentClassLogger();
}
public async Task<Result<T>> ExecuteAsync<T>(Func<Task<Result<T>>> operation)
{
if (_state == CircuitBreakerState.Open)
{
if (DateTime.UtcNow - _lastFailureTime < _recoveryTimeout)
{
_logger.Warn("Circuit breaker {Name} is OPEN - rejecting call", _name);
return Result<T>.AsError("Circuit breaker is open");
}
// Transition to Half-Open state
lock (_lock)
{
if (_state == CircuitBreakerState.Open)
{
_state = CircuitBreakerState.HalfOpen;
_logger.Info("Circuit breaker {Name} transitioning to HALF-OPEN", _name);
}
}
}
try
{
var result = await operation();
if (result.Status == ResultStatus.Success)
{
OnSuccess();
}
else
{
OnFailure();
}
return result;
}
catch (Exception ex)
{
OnFailure();
return Result<T>.AsError($"Operation failed: {ex.Message}");
}
}
private void OnSuccess()
{
lock (_lock)
{
_failureCount = 0;
if (_state == CircuitBreakerState.HalfOpen)
{
_state = CircuitBreakerState.Closed;
_logger.Info("Circuit breaker {Name} transitioning to CLOSED", _name);
}
}
}
private void OnFailure()
{
lock (_lock)
{
_failureCount++;
_lastFailureTime = DateTime.UtcNow;
if (_failureCount >= _failureThreshold && _state != CircuitBreakerState.Open)
{
_state = CircuitBreakerState.Open;
_logger.Warn("Circuit breaker {Name} transitioning to OPEN after {FailureCount} failures", _name, _failureCount);
}
}
}
public CircuitBreakerState State => _state;
public int FailureCount => _failureCount;
}
public enum CircuitBreakerState
{
Closed,
Open,
HalfOpen
}
// Circuit breaker integration with external service client
public class ResilientExternalServiceClient
{
private readonly IExternalServiceClient _client;
private readonly CircuitBreaker _circuitBreaker;
private readonly IRetryPolicy _retryPolicy;
public ResilientExternalServiceClient(IExternalServiceClient client)
{
_client = client;
_circuitBreaker = new CircuitBreaker(
name: "ExternalService",
failureThreshold: 5,
timeout: TimeSpan.FromSeconds(30),
recoveryTimeout: TimeSpan.FromMinutes(1));
_retryPolicy = new ExponentialRetryPolicy(
maxRetries: 3,
baseDelay: TimeSpan.FromSeconds(1));
}
public async Task<Result<T>> CallServiceAsync<T>(Func<Task<Result<T>>> serviceCall)
{
return await _circuitBreaker.ExecuteAsync(async () =>
{
return await _retryPolicy.ExecuteAsync(serviceCall);
});
}
}
```
### **3. Retry Policy Pattern**
```csharp
public interface IRetryPolicy
{
Task<Result<T>> ExecuteAsync<T>(Func<Task<Result<T>>> operation);
}
public class ExponentialRetryPolicy : IRetryPolicy
{
private readonly int _maxRetries;
private readonly TimeSpan _baseDelay;
private readonly double _multiplier;
private readonly TimeSpan _maxDelay;
private readonly ILogger _logger;
public ExponentialRetryPolicy(
int maxRetries = 3,
TimeSpan? baseDelay = null,
double multiplier = 2.0,
TimeSpan? maxDelay = null)
{
_maxRetries = maxRetries;
_baseDelay = baseDelay ?? TimeSpan.FromSeconds(1);
_multiplier = multiplier;
_maxDelay = maxDelay ?? TimeSpan.FromMinutes(1);
_logger = LogManager.GetCurrentClassLogger();
}
public async Task<Result<T>> ExecuteAsync<T>(Func<Task<Result<T>>> operation)
{
var attempt = 0;
var delay = _baseDelay;
while (attempt <= _maxRetries)
{
try
{
var result = await operation();
if (result.Status == ResultStatus.Success || !IsRetriableError(result))
{
if (attempt > 0)
{
_logger.Info("Operation succeeded after {Attempts} attempts", attempt + 1);
}
return result;
}
if (attempt < _maxRetries)
{
_logger.Warn("Operation failed (attempt {Attempt}/{MaxAttempts}): {Error}. Retrying in {Delay}ms",
attempt + 1, _maxRetries + 1, result.Error, delay.TotalMilliseconds);
await Task.Delay(delay);
delay = TimeSpan.FromMilliseconds(Math.Min(delay.TotalMilliseconds * _multiplier, _maxDelay.TotalMilliseconds));
}
attempt++;
}
catch (Exception ex)
{
if (attempt < _maxRetries && IsRetriableException(ex))
{
_logger.Warn(ex, "Operation threw exception (attempt {Attempt}/{MaxAttempts}). Retrying in {Delay}ms",
attempt + 1, _maxRetries + 1, delay.TotalMilliseconds);
await Task.Delay(delay);
delay = TimeSpan.FromMilliseconds(Math.Min(delay.TotalMilliseconds * _multiplier, _maxDelay.TotalMilliseconds));
attempt++;
}
else
{
return Result<T>.AsError($"Operation failed after {attempt + 1} attempts: {ex.Message}");
}
}
}
_logger.Error("Operation failed after {MaxAttempts} attempts", _maxRetries + 1);
return Result<T>.AsError($"Operation failed after {_maxRetries + 1} attempts");
}
private bool IsRetriableError(Result result)
{
// Define which errors are worth retrying
if (result.Error == null) return false;
return result.Error.Contains("timeout") ||
result.Error.Contains("connection") ||
result.Error.Contains("network") ||
result.Error.Contains("503") ||
result.Error.Contains("502") ||
result.Error.Contains("504");
}
private bool IsRetriableException(Exception exception)
{
return exception is TimeoutException ||
exception is HttpRequestException ||
exception is SocketException ||
(exception is TaskCanceledException && exception.InnerException is TimeoutException);
}
}
// Polly-style retry policy for more advanced scenarios
public class AdvancedRetryPolicy : IRetryPolicy
{
public async Task<Result<T>> ExecuteAsync<T>(Func<Task<Result<T>>> operation)
{
var retryPolicy = Policy
.Handle<HttpRequestException>()
.Or<TimeoutException>()
.Or<TaskCanceledException>()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (outcome, timespan, retryCount, context) =>
{
LogManager.GetCurrentClassLogger().Warn(
"Retry {RetryCount} in {Delay}ms due to: {Exception}",
retryCount, timespan.TotalMilliseconds, outcome.Exception?.Message);
});
try
{
var result = await retryPolicy.ExecuteAsync(operation);
return result;
}
catch (Exception ex)
{
return Result<T>.AsError($"All retry attempts failed: {ex.Message}");
}
}
}
```
## Data Transformation and Mapping Patterns
### **1. Entity-DTO Conversion Pattern**
```csharp
public static class ObjectMapper
{
private static readonly IMapper _mapper;
static ObjectMapper()
{
var configuration = new MapperConfiguration(cfg =>
{
// Account mappings
cfg.CreateMap<Account, AccountDTO>()
.ForMember(dest => dest.ContactPersons, opt => opt.MapFrom(src => src.ContactPersons))
.ForMember(dest => dest.Addresses, opt => opt.MapFrom(src => src.Addresses));
cfg.CreateMap<AccountDTO, Account>()
.ForMember(dest => dest.ContactPersons, opt => opt.MapFrom(src => src.ContactPersons))
.ForMember(dest => dest.Addresses, opt => opt.MapFrom(src => src.Addresses));
// Custom mappings with transformations
cfg.CreateMap<Account, AccountSummaryDTO>()
.ForMember(dest => dest.TotalContracts, opt => opt.MapFrom(src => src.Contracts.Count))
.ForMember(dest => dest.ActiveContracts, opt => opt.MapFrom(src => src.Contracts.Count(c => c.IsActive)))
.ForMember(dest => dest.LastActivity, opt => opt.MapFrom(src => src.LastModified));
// Complex nested mappings
cfg.CreateMap<Receipt, ReceiptDTO>()
.ForMember(dest => dest.Items, opt => opt.MapFrom(src => src.ReceiptItems))
.ForMember(dest => dest.CustomerName, opt => opt.MapFrom(src => src.Customer.CompanyName))
.ForMember(dest => dest.PaymentMethodName, opt => opt.MapFrom(src => src.PaymentMethod.Name));
});
configuration.AssertConfigurationIsValid();
_mapper = configuration.CreateMapper();
}
public static TDestination Map<TDestination>(object source)
{
return _mapper.Map<TDestination>(source);
}
public static TDestination Map<TSource, TDestination>(TSource source, TDestination destination)
{
return _mapper.Map(source, destination);
}
public static IEnumerable<TDestination> Map<TDestination>(IEnumerable source)
{
return _mapper.Map<IEnumerable<TDestination>>(source);
}
}
// Custom conversion methods for complex transformations
public static class ConversionExtensions
{
public static Account ConvertAccountDTOToAccount(AccountDTO accountDTO)
{
if (accountDTO == null) return null;
var account = ObjectMapper.Map<Account>(accountDTO);
// Custom logic that AutoMapper can't handle
if (accountDTO.CustomFields != null)
{
account.CustomProperties = accountDTO.CustomFields
.Select(cf => new CustomProperty
{
Name = cf.Key,
Value = cf.Value,
DataType = InferDataType(cf.Value)
})
.ToList();
}
// Handle special business logic
if (account.CustomerType == CustomerType.Business && string.IsNullOrEmpty(account.VatNumber))
{
throw new ValidationException("VAT number is required for business customers");
}
return account;
}
public static AccountDTO ConvertAccountToAccountDTO(Account account)
{
if (account == null) return null;
var accountDTO = ObjectMapper.Map<AccountDTO>(account);
// Convert custom properties back to dictionary
if (account.CustomProperties?.Any() == true)
{
accountDTO.CustomFields = account.CustomProperties
.ToDictionary(cp => cp.Name, cp => cp.Value);
}
// Apply security filtering - don't expose sensitive data
if (!HasPermissionToViewSensitiveData())
{
accountDTO.BankAccountNumber = null;
accountDTO.CreditLimit = null;
}
return accountDTO;
}
private static string InferDataType(object value)
{
return value switch
{
int or long => "Integer",
decimal or double or float => "Decimal",
bool => "Boolean",
DateTime => "DateTime",
_ => "String"
};
}
private static bool HasPermissionToViewSensitiveData()
{
// Implementation would check current user permissions
return true; // Simplified for example
}
}
```
### **2. External Data Format Transformation**
```csharp
public class DataTransformationService
{
public async Task<Result<List<T>>> TransformExternalDataAsync<T>(
Stream inputStream,
DataFormat inputFormat,
IDataTransformationRules<T> transformationRules) where T : class
{
try
{
var rawData = await ReadRawDataAsync(inputStream, inputFormat);
var transformedData = new List<T>();
foreach (var rawItem in rawData)
{
var transformResult = await transformationRules.TransformAsync(rawItem);
if (transformResult.Status == ResultStatus.Success)
{
transformedData.Add(transformResult.Data);
}
else
{
LogTransformationError(rawItem, transformResult.Error);
}
}
return Result<List<T>>.AsSuccess(transformedData);
}
catch (Exception ex)
{
return Result<List<T>>.AsError($"Data transformation failed: {ex.Message}");
}
}
private async Task<List<Dictionary<string, object>>> ReadRawDataAsync(Stream inputStream, DataFormat format)
{
return format switch
{
DataFormat.Json => await ReadJsonDataAsync(inputStream),
DataFormat.Xml => await ReadXmlDataAsync(inputStream),
DataFormat.Csv => await ReadCsvDataAsync(inputStream),
DataFormat.Edi => await ReadEdiDataAsync(inputStream),
_ => throw new NotSupportedException($"Data format {format} is not supported")
};
}
private async Task<List<Dictionary<string, object>>> ReadCsvDataAsync(Stream inputStream)
{
var data = new List<Dictionary<string, object>>();
using var reader = new StreamReader(inputStream);
var headerLine = await reader.ReadLineAsync();
if (headerLine == null) return data;
var headers = headerLine.Split(',').Select(h => h.Trim()).ToArray();
string line;
while ((line = await reader.ReadLineAsync()) != null)
{
var values = line.Split(',');
var row = new Dictionary<string, object>();
for (int i = 0; i < headers.Length && i < values.Length; i++)
{
row[headers[i]] = values[i].Trim();
}
data.Add(row);
}
return data;
}
}
// Transformation rules for specific data types
public class CustomerTransformationRules : IDataTransformationRules<CustomerDTO>
{
public async Task<Result<CustomerDTO>> TransformAsync(Dictionary<string, object> rawData)
{
try
{
var customer = new CustomerDTO
{
CompanyName = GetStringValue(rawData, "company_name", "CompanyName"),
CustomerNumber = GetIntValue(rawData, "customer_no", "CustomerNumber"),
Email = GetStringValue(rawData, "email", "Email"),
PhoneNumber = GetStringValue(rawData, "phone", "PhoneNumber")
};
// Apply transformation rules
customer.CompanyName = CleanCompanyName(customer.CompanyName);
customer.Email = NormalizeEmail(customer.Email);
customer.PhoneNumber = NormalizePhoneNumber(customer.PhoneNumber);
// Validate transformed data
var validationResult = await ValidateCustomer(customer);
if (validationResult.Status != ResultStatus.Success)
return Result<CustomerDTO>.FromResult(validationResult);
return Result<CustomerDTO>.AsSuccess(customer);
}
catch (Exception ex)
{
return Result<CustomerDTO>.AsError($"Customer transformation failed: {ex.Message}");
}
}
private string GetStringValue(Dictionary<string, object> data, params string[] keys)
{
foreach (var key in keys)
{
if (data.TryGetValue(key, out var value))
return value?.ToString();
}
return null;
}
private int GetIntValue(Dictionary<string, object> data, params string[] keys)
{
foreach (var key in keys)
{
if (data.TryGetValue(key, out var value) && int.TryParse(value?.ToString(), out var intValue))
return intValue;
}
return 0;
}
private string CleanCompanyName(string companyName)
{
if (string.IsNullOrWhiteSpace(companyName)) return companyName;
// Remove extra whitespace and normalize
return Regex.Replace(companyName.Trim(), @"\s+", " ");
}
private string NormalizeEmail(string email)
{
if (string.IsNullOrWhiteSpace(email)) return email;
return email.Trim().ToLowerInvariant();
}
private string NormalizePhoneNumber(string phoneNumber)
{
if (string.IsNullOrWhiteSpace(phoneNumber)) return phoneNumber;
// Remove all non-digit characters except +
var cleaned = Regex.Replace(phoneNumber, @"[^\d+]", "");
// Add default country code if missing
if (!cleaned.StartsWith("+") && cleaned.Length >= 10)
{
cleaned = "+49" + cleaned; // Default to German numbers
}
return cleaned;
}
private async Task<Result> ValidateCustomer(CustomerDTO customer)
{
if (string.IsNullOrWhiteSpace(customer.CompanyName))
return Result.AsError("Company name is required");
if (customer.CustomerNumber <= 0)
return Result.AsError("Valid customer number is required");
if (!string.IsNullOrEmpty(customer.Email) && !IsValidEmail(customer.Email))
return Result.AsError("Invalid email format");
return Result.AsSuccess();
}
private bool IsValidEmail(string email)
{
try
{
var mailAddress = new MailAddress(email);
return mailAddress.Address == email;
}
catch
{
return false;
}
}
}
```
## Message Queue and Event Processing Patterns
### **1. Event-Driven Integration Pattern**
```csharp
public interface IEventPublisher
{
Task<Result> PublishAsync<T>(T eventData, string eventType, Dictionary<string, string> metadata = null) where T : class;
}
public interface IEventSubscriber
{
Task<Result> SubscribeAsync<T>(string eventType, Func<T, Task<Result>> handler) where T : class;
Task<Result> UnsubscribeAsync(string eventType);
}
public class MessageQueueEventService : IEventPublisher, IEventSubscriber
{
private readonly IMessageQueue _messageQueue;
private readonly IEventSerializer _serializer;
private readonly ILogger<MessageQueueEventService> _logger;
private readonly Dictionary<string, List<Func<object, Task<Result>>>> _handlers = new();
public async Task<Result> PublishAsync<T>(T eventData, string eventType, Dictionary<string, string> metadata = null) where T : class
{
try
{
var message = new EventMessage
{
Id = Guid.NewGuid().ToString(),
EventType = eventType,
Data = _serializer.Serialize(eventData),
Metadata = metadata ?? new Dictionary<string, string>(),
Timestamp = DateTime.UtcNow
};
message.Metadata["ContentType"] = typeof(T).FullName;
message.Metadata["Publisher"] = Environment.MachineName;
await _messageQueue.PublishAsync(eventType, message);
_logger.LogInformation("Published event {EventType} with ID {EventId}", eventType, message.Id);
return Result.AsSuccess();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to publish event {EventType}", eventType);
return Result.AsError($"Event publishing failed: {ex.Message}");
}
}
public async Task<Result> SubscribeAsync<T>(string eventType, Func<T, Task<Result>> handler) where T : class
{
try
{
// Wrap the typed handler
var wrappedHandler = new Func<object, Task<Result>>(async data =>
{
if (data is T typedData)
{
return await handler(typedData);
}
return Result.AsError($"Invalid data type for event {eventType}");
});
if (!_handlers.ContainsKey(eventType))
{
_handlers[eventType] = new List<Func<object, Task<Result>>>();
// Subscribe to the queue
await _messageQueue.SubscribeAsync(eventType, ProcessEventMessage);
}
_handlers[eventType].Add(wrappedHandler);
_logger.LogInformation("Subscribed to event {EventType}", eventType);
return Result.AsSuccess();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to subscribe to event {EventType}", eventType);
return Result.AsError($"Event subscription failed: {ex.Message}");
}
}
private async Task<Result> ProcessEventMessage(EventMessage message)
{
try
{
if (!_handlers.ContainsKey(message.EventType))
{
_logger.LogWarning("No handlers registered for event type {EventType}", message.EventType);
return Result.AsSuccess(); // Not an error, just no handlers
}
// Deserialize the data
var contentType = message.Metadata.GetValueOrDefault("ContentType");
var dataType = Type.GetType(contentType);
var eventData = _serializer.Deserialize(message.Data, dataType);
// Process with all registered handlers
var handlers = _handlers[message.EventType];
var processingTasks = handlers.Select(handler => ProcessWithHandler(handler, eventData, message));
var results = await Task.WhenAll(processingTasks);
// Check if any handlers failed
var failures = results.Where(r => r.Status != ResultStatus.Success).ToList();
if (failures.Any())
{
_logger.LogError("Event processing failed for {EventType}: {Errors}",
message.EventType, string.Join("; ", failures.Select(f => f.Error)));
return Result.AsError($"Event processing failed: {failures.Count} handlers failed");
}
_logger.LogInformation("Successfully processed event {EventType} with {HandlerCount} handlers",
message.EventType, handlers.Count);
return Result.AsSuccess();
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error processing event {EventType}", message.EventType);
return Result.AsError($"Event processing failed: {ex.Message}");
}
}
private async Task<Result> ProcessWithHandler(Func<object, Task<Result>> handler, object eventData, EventMessage message)
{
try
{
return await handler(eventData);
}
catch (Exception ex)
{
_logger.LogError(ex, "Handler failed for event {EventType} (ID: {EventId})",
message.EventType, message.Id);
return Result.AsError($"Handler execution failed: {ex.Message}");
}
}
}
// Usage example - Account events
public class AccountEventPublisher
{
private readonly IEventPublisher _eventPublisher;
public async Task<Result> NotifyAccountCreatedAsync(AccountDTO account)
{
var eventData = new AccountCreatedEvent
{
AccountId = account.I3D,
CompanyName = account.CompanyName,
CustomerNumber = account.CustomerNumber,
CreatedBy = account.CreatedBy,
CreatedDate = DateTime.UtcNow
};
return await _eventPublisher.PublishAsync(eventData, "AccountCreated", new Dictionary<string, string>
{
["CustomerId"] = account.CustomerNumber.ToString(),
["Priority"] = "Normal"
});
}
public async Task<Result> NotifyAccountUpdatedAsync(AccountDTO oldAccount, AccountDTO newAccount)
{
var eventData = new AccountUpdatedEvent
{
AccountId = newAccount.I3D,
OldValues = ObjectMapper.Map<Dictionary<string, object>>(oldAccount),
NewValues = ObjectMapper.Map<Dictionary<string, object>>(newAccount),
ModifiedBy = newAccount.ModifiedBy,
ModifiedDate = DateTime.UtcNow
};
return await _eventPublisher.PublishAsync(eventData, "AccountUpdated");
}
}
// Event handlers
public class AccountEventHandler
{
private readonly IEmailService _emailService;
private readonly IAuditService _auditService;
public async Task<Result> HandleAccountCreatedAsync(AccountCreatedEvent eventData)
{
try
{
// Send welcome email
await _emailService.SendWelcomeEmailAsync(eventData.AccountId);
// Log audit entry
await _auditService.LogBusinessActionAsync(
"ACCOUNT_CREATED",
"Account",
eventData.AccountId,
new Dictionary<string, object>
{
["CompanyName"] = eventData.CompanyName,
["CustomerNumber"] = eventData.CustomerNumber
});
return Result.AsSuccess();
}
catch (Exception ex)
{
return Result.AsError($"Failed to handle AccountCreated event: {ex.Message}");
}
}
}
```
### **2. File-Based Integration Pattern**
```csharp
public class FileIntegrationService
{
private readonly IFileProcessor _fileProcessor;
private readonly IDataTransformationService _transformationService;
private readonly ILogger<FileIntegrationService> _logger;
public async Task<Result<FileProcessingResult>> ProcessFileAsync(
string filePath,
FileIntegrationConfig config)
{
try
{
var result = new FileProcessingResult
{
FilePath = filePath,
StartTime = DateTime.UtcNow,
Config = config
};
// Validate file exists and is readable
if (!File.Exists(filePath))
{
return Result<FileProcessingResult>.AsError($"File not found: {filePath}");
}
// Create backup if required
if (config.CreateBackup)
{
var backupPath = CreateBackupFile(filePath);
result.BackupPath = backupPath;
}
// Process the file
using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read);
switch (config.ProcessingMode)
{
case FileProcessingMode.Import:
await ProcessImportFileAsync(fileStream, config, result);
break;
case FileProcessingMode.Export:
await ProcessExportFileAsync(fileStream, config, result);
break;
case FileProcessingMode.Transform:
await ProcessTransformFileAsync(fileStream, config, result);
break;
default:
return Result<FileProcessingResult>.AsError($"Unsupported processing mode: {config.ProcessingMode}");
}
// Archive processed file if required
if (config.ArchiveAfterProcessing)
{
var archivePath = ArchiveFile(filePath, config.ArchiveDirectory);
result.ArchivePath = archivePath;
}
result.EndTime = DateTime.UtcNow;
result.Success = true;
_logger.LogInformation("File processing completed: {FilePath} - {RecordsProcessed} records in {Duration}ms",
filePath, result.RecordsProcessed, (result.EndTime - result.StartTime).TotalMilliseconds);
return Result<FileProcessingResult>.AsSuccess(result);
}
catch (Exception ex)
{
_logger.LogError(ex, "File processing failed: {FilePath}", filePath);
return Result<FileProcessingResult>.AsError($"File processing failed: {ex.Message}");
}
}
private async Task ProcessImportFileAsync(
Stream fileStream,
FileIntegrationConfig config,
FileProcessingResult result)
{
var rawData = await _fileProcessor.ReadDataAsync(fileStream, config.FileFormat);
foreach (var rawRecord in rawData)
{
try
{
var transformedRecord = await _transformationService
.TransformAsync(rawRecord, config.TransformationRules);
if (transformedRecord.Status == ResultStatus.Success)
{
var saveResult = await SaveRecordAsync(transformedRecord.Data, config);
if (saveResult.Status == ResultStatus.Success)
{
result.RecordsProcessed++;
}
else
{
result.Errors.Add($"Failed to save record: {saveResult.Error}");
result.RecordsFailed++;
}
}
else
{
result.Errors.Add($"Failed to transform record: {transformedRecord.Error}");
result.RecordsFailed++;
}
}
catch (Exception ex)
{
result.Errors.Add($"Error processing record: {ex.Message}");
result.RecordsFailed++;
}
}
}
private string CreateBackupFile(string originalPath)
{
var directory = Path.GetDirectoryName(originalPath);
var fileName = Path.GetFileNameWithoutExtension(originalPath);
var extension = Path.GetExtension(originalPath);
var timestamp = DateTime.Now.ToString("yyyyMMdd_HHmmss");
var backupPath = Path.Combine(directory, $"{fileName}_backup_{timestamp}{extension}");
File.Copy(originalPath, backupPath);
return backupPath;
}
private string ArchiveFile(string filePath, string archiveDirectory)
{
if (!Directory.Exists(archiveDirectory))
{
Directory.CreateDirectory(archiveDirectory);
}
var fileName = Path.GetFileName(filePath);
var archivePath = Path.Combine(archiveDirectory, fileName);
// Add timestamp if file already exists
if (File.Exists(archivePath))
{
var nameWithoutExt = Path.GetFileNameWithoutExtension(fileName);
var extension = Path.GetExtension(fileName);
var timestamp = DateTime.Now.ToString("yyyyMMdd_HHmmss");
archivePath = Path.Combine(archiveDirectory, $"{nameWithoutExt}_{timestamp}{extension}");
}
File.Move(filePath, archivePath);
return archivePath;
}
}
```
## Integration Best Practices and Guidelines
### **1. Error Handling and Recovery**
- **Graceful Degradation**: System continues to function when external services are unavailable
- **Retry Logic**: Implement exponential backoff for transient failures
- **Circuit Breakers**: Prevent cascade failures in distributed systems
- **Dead Letter Queues**: Handle messages that cannot be processed
### **2. Data Consistency and Synchronization**
- **Idempotent Operations**: Operations can be safely repeated without side effects
- **Event Sourcing**: Track all changes as events for audit and replay capability
- **Eventual Consistency**: Accept temporary inconsistency for better performance
- **Compensation Patterns**: Implement compensating actions for failed operations
### **3. Security and Authentication**
- **API Keys and Tokens**: Secure external API authentication
- **Certificate Validation**: Verify SSL/TLS certificates in production
- **Data Encryption**: Encrypt sensitive data in transit and at rest
- **Input Validation**: Validate all external data before processing
### **4. Monitoring and Observability**
- **Integration Health Checks**: Monitor external service availability
- **Performance Metrics**: Track response times and throughput
- **Error Tracking**: Log and monitor integration failures
- **Distributed Tracing**: Track requests across service boundaries
## Conclusion
The Centron application implements comprehensive integration patterns that provide:
- **Resilience**: Circuit breakers and retry policies ensure system stability
- **Flexibility**: Multiple integration channels support various external systems
- **Reliability**: Error handling and recovery mechanisms prevent data loss
- **Scalability**: Asynchronous processing and message queues handle high volumes
- **Maintainability**: Clear patterns and abstractions simplify integration management
- **Security**: Authentication and validation protect against external threats
- **Observability**: Comprehensive monitoring provides visibility into integration health
These integration patterns enable seamless communication between the Centron application and external systems while maintaining reliability, security, and performance under various conditions.