Loading

Channel configuration

IngestChannel<T> supports two configuration modes: auto-configuration from ElasticsearchTypeContext, and manual configuration with explicit strategies via the IngestStrategies and BootstrapStrategies factory methods.

Channel configuration determines how your documents reach Elasticsearch -- which index they target, what templates are created, and how aliases are managed. The composable strategy pattern lets you start with zero-config defaults and override individual behaviors as your needs grow.

When you provide an ElasticsearchTypeContext (generated by Elastic.Mapping), strategies are resolved automatically based on your entity declaration:

var options = new IngestChannelOptions<Product>(transport, MyContext.Product.Context);
using var channel = new IngestChannel<Product>(options);
		

Auto-resolution rules:

Entity target Ingest Bootstrap Provisioning Alias
DataStream DataStreamIngestStrategy Component + data stream templates Always create No alias
Index TypeContextIndexIngestStrategy Component + index templates Hash-based reuse (if [ContentHash] present) Latest + search (if aliases configured)
WiredStream WiredStreamIngestStrategy No-op Always create No alias

Use the IngestStrategies and BootstrapStrategies factory methods for full control:

// Data stream with 30-day retention
var strategy = IngestStrategies.DataStream<LogEntry>(
    LoggingContext.LogEntry.Context, "30d");
var options = new IngestChannelOptions<LogEntry>(transport, strategy,
    LoggingContext.LogEntry.Context);

// Data stream with ILM
var strategy = IngestStrategies.DataStream<LogEntry>(
    LoggingContext.LogEntry.Context,
    BootstrapStrategies.DataStreamWithIlm("logs-policy", hotMaxAge: "7d", deleteMinAge: "90d"));
var options = new IngestChannelOptions<LogEntry>(transport, strategy,
    LoggingContext.LogEntry.Context);

// Index with ILM
var strategy = IngestStrategies.Index<Product>(
    CatalogContext.Product.Context,
    BootstrapStrategies.IndexWithIlm("my-policy"));
var options = new IngestChannelOptions<Product>(transport, strategy,
    CatalogContext.Product.Context);
		
Option Type Description
Strategy IIngestStrategy<TEvent> The composed strategy defining all channel behaviors (set via constructor)
TypeContext ElasticsearchTypeContext? Source-generated context for auto-configuration and mappings

Buffer behavior is configured through BufferOptions:

var options = new IngestChannelOptions<MyDoc>(transport, MyContext.MyDoc.Context)
{
    BufferOptions = new BufferOptions
    {
        InboundBufferMaxSize = 500_000,
        OutboundBufferMaxSize = 5_000,
        OutboundBufferMaxLifetime = TimeSpan.FromSeconds(3),
        ExportMaxConcurrency = 8,
        ExportMaxRetries = 5
    }
};
		

See push model for the full buffer options reference.

Option Type Description
SerializerContext JsonSerializerContext? Source-generated serializer context for AOT
EventWriter IElasticsearchEventWriter<TEvent>? Custom per-document serialization

See serialization for details.

Register callback listeners to observe channel behavior:

var listeners = new List<IChannelCallbacks<MyDoc, BulkResponse>>
{
    new MyDiagnosticsListener()
};
var channel = new IngestChannel<MyDoc>(options, listeners);
		

Callback hooks include:

  • Inbound: PublishToInboundChannelSuccess, PublishToInboundChannelFailure
  • Export: ExportItemsAttempt, ExportResponse, ExportException
  • Retry: ExportRetryableCount, ExportRetry, ExportMaxRetries
// Throws on failure
await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure);

// Silently returns false on failure
var success = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Silent);

// Skip bootstrap entirely
await channel.BootstrapElasticsearchAsync(BootstrapMethod.None);
		

Bootstrap is idempotent: if templates exist with the same content hash, the PUT operations are skipped.

To use manual rollover, compose a strategy that includes ManualRolloverStrategy and pass it via the IngestStrategy<T> constructor:

var strategy = new IngestStrategy<LogEntry>(
    LoggingContext.LogEntry.Context,
    BootstrapStrategies.DataStream(),
    new DataStreamIngestStrategy<LogEntry>("logs-myapp-production", "/_bulk"),
    new AlwaysCreateProvisioning(),
    new NoAliasStrategy(),
    new ManualRolloverStrategy()
);
var options = new IngestChannelOptions<LogEntry>(transport, strategy,
    LoggingContext.LogEntry.Context);
using var channel = new IngestChannel<LogEntry>(options);

// Rollover with conditions
await channel.RolloverAsync(maxAge: "7d", maxSize: "50gb");

// Unconditional rollover
await channel.RolloverAsync();
		

See rollover strategies and rollover API.