Channel configuration
IngestChannel<T> supports two configuration modes: auto-configuration from ElasticsearchTypeContext, and manual configuration with explicit strategies via the IngestStrategies and BootstrapStrategies factory methods.
Channel configuration determines how your documents reach Elasticsearch -- which index they target, what templates are created, and how aliases are managed. The composable strategy pattern lets you start with zero-config defaults and override individual behaviors as your needs grow.
When you provide an ElasticsearchTypeContext (generated by Elastic.Mapping), strategies are resolved automatically based on your entity declaration:
var options = new IngestChannelOptions<Product>(transport, MyContext.Product.Context);
using var channel = new IngestChannel<Product>(options);
Auto-resolution rules:
| Entity target | Ingest | Bootstrap | Provisioning | Alias |
|---|---|---|---|---|
DataStream |
DataStreamIngestStrategy |
Component + data stream templates | Always create | No alias |
Index |
TypeContextIndexIngestStrategy |
Component + index templates | Hash-based reuse (if [ContentHash] present) |
Latest + search (if aliases configured) |
WiredStream |
WiredStreamIngestStrategy |
No-op | Always create | No alias |
Use the IngestStrategies and BootstrapStrategies factory methods for full control:
// Data stream with 30-day retention
var strategy = IngestStrategies.DataStream<LogEntry>(
LoggingContext.LogEntry.Context, "30d");
var options = new IngestChannelOptions<LogEntry>(transport, strategy,
LoggingContext.LogEntry.Context);
// Data stream with ILM
var strategy = IngestStrategies.DataStream<LogEntry>(
LoggingContext.LogEntry.Context,
BootstrapStrategies.DataStreamWithIlm("logs-policy", hotMaxAge: "7d", deleteMinAge: "90d"));
var options = new IngestChannelOptions<LogEntry>(transport, strategy,
LoggingContext.LogEntry.Context);
// Index with ILM
var strategy = IngestStrategies.Index<Product>(
CatalogContext.Product.Context,
BootstrapStrategies.IndexWithIlm("my-policy"));
var options = new IngestChannelOptions<Product>(transport, strategy,
CatalogContext.Product.Context);
| Option | Type | Description |
|---|---|---|
Strategy |
IIngestStrategy<TEvent> |
The composed strategy defining all channel behaviors (set via constructor) |
TypeContext |
ElasticsearchTypeContext? |
Source-generated context for auto-configuration and mappings |
Buffer behavior is configured through BufferOptions:
var options = new IngestChannelOptions<MyDoc>(transport, MyContext.MyDoc.Context)
{
BufferOptions = new BufferOptions
{
InboundBufferMaxSize = 500_000,
OutboundBufferMaxSize = 5_000,
OutboundBufferMaxLifetime = TimeSpan.FromSeconds(3),
ExportMaxConcurrency = 8,
ExportMaxRetries = 5
}
};
See push model for the full buffer options reference.
| Option | Type | Description |
|---|---|---|
SerializerContext |
JsonSerializerContext? |
Source-generated serializer context for AOT |
EventWriter |
IElasticsearchEventWriter<TEvent>? |
Custom per-document serialization |
See serialization for details.
Register callback listeners to observe channel behavior:
var listeners = new List<IChannelCallbacks<MyDoc, BulkResponse>>
{
new MyDiagnosticsListener()
};
var channel = new IngestChannel<MyDoc>(options, listeners);
Callback hooks include:
- Inbound:
PublishToInboundChannelSuccess,PublishToInboundChannelFailure - Export:
ExportItemsAttempt,ExportResponse,ExportException - Retry:
ExportRetryableCount,ExportRetry,ExportMaxRetries
// Throws on failure
await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure);
// Silently returns false on failure
var success = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Silent);
// Skip bootstrap entirely
await channel.BootstrapElasticsearchAsync(BootstrapMethod.None);
Bootstrap is idempotent: if templates exist with the same content hash, the PUT operations are skipped.
To use manual rollover, compose a strategy that includes ManualRolloverStrategy and pass it via the IngestStrategy<T> constructor:
var strategy = new IngestStrategy<LogEntry>(
LoggingContext.LogEntry.Context,
BootstrapStrategies.DataStream(),
new DataStreamIngestStrategy<LogEntry>("logs-myapp-production", "/_bulk"),
new AlwaysCreateProvisioning(),
new NoAliasStrategy(),
new ManualRolloverStrategy()
);
var options = new IngestChannelOptions<LogEntry>(transport, strategy,
LoggingContext.LogEntry.Context);
using var channel = new IngestChannel<LogEntry>(options);
// Rollover with conditions
await channel.RolloverAsync(maxAge: "7d", maxSize: "50gb");
// Unconditional rollover
await channel.RolloverAsync();
See rollover strategies and rollover API.