IncrementalSyncOrchestrator
IncrementalSyncOrchestrator<T> coordinates a full incremental sync workflow with a primary and secondary channel. It handles strategy selection, ingestion routing, alias swapping, reindexing, and cleanup.
When StartAsync is called, the orchestrator bootstraps both channels and determines whether to use reindex or multiplex mode:
flowchart TD
Start[StartAsync] --> PreBoot[Run pre-bootstrap tasks]
PreBoot --> Boot[Bootstrap primary channel]
Boot --> HashCheck{Primary template<br/>hash changed?}
HashCheck -->|Yes| MX[Multiplex mode]
HashCheck -->|No| AliasCheck{Secondary alias<br/>exists?}
AliasCheck -->|No| MX
AliasCheck -->|Yes| BootS[Bootstrap secondary channel]
BootS --> HashCheck2{Secondary template<br/>hash changed?}
HashCheck2 -->|Yes| MX
HashCheck2 -->|No| RX[Reindex mode]
- Reindex mode: both schemas are unchanged and the secondary index exists. Only the primary channel receives writes; the secondary is updated via
_reindexafter drain. - Multiplex mode: a schema has changed or the secondary index doesn't exist. Both channels receive every document simultaneously.
sequenceDiagram
participant App
participant Orch as Orchestrator
participant P as Primary Channel
participant S as Secondary Channel
participant ES as Elasticsearch
App->>Orch: StartAsync()
Orch->>ES: Run pre-bootstrap tasks
Orch->>P: Create + Bootstrap
Orch->>ES: Compare template hash (matches)
Orch->>ES: HEAD secondary alias (exists)
Orch->>S: Create + Bootstrap
Orch-->>App: IngestStrategy.Reindex
loop Documents
App->>Orch: TryWrite(doc)
Orch->>P: TryWrite(doc)
Note right of Orch: Only primary receives writes
end
App->>Orch: CompleteAsync()
Orch->>P: Drain + Refresh + Apply aliases
Orch->>ES: _reindex (updates: last_updated >= batch timestamp)
Orch->>ES: _reindex + delete script (old docs)
Orch->>ES: _delete_by_query (cleanup primary)
Orch->>P: Alias swap
Orch->>S: Alias swap
Orch-->>App: complete
In reindex mode:
- Only the primary channel receives writes (better performance)
- After drain, documents modified since the batch timestamp are reindexed from primary to secondary
- Old documents (before batch timestamp) are handled via a delete script
- Both channels get their aliases swapped
- Old documents are cleaned up from the primary
sequenceDiagram
participant App
participant Orch as Orchestrator
participant P as Primary Channel
participant S as Secondary Channel
participant ES as Elasticsearch
App->>Orch: StartAsync()
Orch->>P: Bootstrap (hash changed)
Note right of Orch: Schema change detected
Orch->>S: Bootstrap
Orch-->>App: IngestStrategy.Multiplex
loop Documents
App->>Orch: TryWrite(doc)
Orch->>P: TryWrite(doc)
Orch->>S: TryWrite(doc)
Note right of Orch: Both channels receive writes
end
App->>Orch: CompleteAsync()
Orch->>P: Drain + Refresh + Apply aliases
Orch->>S: Drain + Refresh + Apply aliases
Orch->>ES: _delete_by_query (old primary docs)
Orch-->>App: complete
In multiplex mode:
- Both channels receive every document (handles schema differences)
- After drain, both channels are refreshed and aliases are applied
- Old documents are cleaned up from the primary
var orchestrator = new IncrementalSyncOrchestrator<TEvent>(
ITransport transport,
ElasticsearchTypeContext primary,
ElasticsearchTypeContext secondary
);
| Property | Type | Default | Description |
|---|---|---|---|
LastUpdatedField |
string |
"last_updated" |
Elasticsearch field name used for range queries when reindexing updates |
BatchIndexDateField |
string |
"batch_index_date" |
Elasticsearch field name used for range queries when deleting old documents |
ConfigurePrimary |
Action<IngestChannelOptions<T>>? |
null |
Callback to customize primary channel options before creation |
ConfigureSecondary |
Action<IngestChannelOptions<T>>? |
null |
Callback to customize secondary channel options before creation |
OnPostComplete |
Func<OrchestratorContext<T>, CancellationToken, Task>? |
null |
Hook that runs after CompleteAsync finishes all operations |
Strategy |
IngestStrategy |
Read-only | The resolved strategy after StartAsync |
BatchTimestamp |
DateTimeOffset |
Read-only | Timestamp captured at orchestrator creation |
| Method | Description |
|---|---|
AddPreBootstrapTask(Func<ITransport, CancellationToken, Task>) |
Add a task that runs before channel bootstrap (returns this for chaining) |
StartAsync(BootstrapMethod, string?, CancellationToken) |
Bootstrap channels and determine strategy |
TryWrite(TEvent) |
Non-blocking write (routes based on strategy) |
WaitToWriteAsync(TEvent, CancellationToken) |
Async write with backpressure |
TryWriteMany(IEnumerable<TEvent>) |
Batch non-blocking write |
WaitToWriteManyAsync(IEnumerable<TEvent>, CancellationToken) |
Batch async write |
CompleteAsync(TimeSpan?, CancellationToken) |
Drain, refresh, reindex/multiplex, alias swap, cleanup |
Dispose() |
Dispose primary and secondary channels |
This example mirrors examples/Elastic.Ingest.MultiChannel/:
// Document type
public class KnowledgeArticle
{
[Id] [Keyword]
public string Url { get; set; }
[Text(Analyzer = "standard")]
public string Title { get; set; }
[Text(Analyzer = "standard")]
public string Body { get; set; }
[ContentHash] [Keyword]
public string Hash { get; set; }
[Timestamp] [JsonPropertyName("@timestamp")]
public DateTimeOffset UpdatedAt { get; set; }
}
// Mapping context with two variants
[ElasticsearchMappingContext]
[Entity<KnowledgeArticle>(
Target = EntityTarget.Index,
Name = "knowledge-lexical",
WriteAlias = "knowledge-lexical",
ReadAlias = "knowledge-lexical-search",
SearchPattern = "knowledge-lexical-*",
DatePattern = "yyyy.MM.dd.HHmmss"
)]
[Entity<KnowledgeArticle>(
Target = EntityTarget.Index,
Name = "knowledge-semantic",
Variant = "Semantic",
WriteAlias = "knowledge-semantic",
ReadAlias = "knowledge-semantic-search",
SearchPattern = "knowledge-semantic-*",
DatePattern = "yyyy.MM.dd.HHmmss"
)]
public static partial class ExampleMappingContext;
var transport = new DistributedTransport(
new TransportConfiguration(new Uri("http://localhost:9200"))
);
using var orchestrator = new IncrementalSyncOrchestrator<KnowledgeArticle>(
transport,
primary: ExampleMappingContext.KnowledgeArticle.Context,
secondary: ExampleMappingContext.KnowledgeArticleSemantic.Context
);
// Optional: configure channel options
orchestrator.ConfigurePrimary = opts =>
{
opts.BufferOptions = new BufferOptions { OutboundBufferMaxSize = 5_000 };
};
// Optional: add pre-bootstrap tasks
orchestrator.AddPreBootstrapTask(async (t, ctx) =>
{
// Create synonym sets, query rules, etc.
});
// Optional: add post-complete hook
orchestrator.OnPostComplete = async (context, ctx) =>
{
Console.WriteLine($"Strategy: {context.Strategy}, Batch: {context.BatchTimestamp}");
};
// Start: bootstrap and determine strategy
var strategy = await orchestrator.StartAsync(BootstrapMethod.Failure);
Console.WriteLine($"Using strategy: {strategy}");
// Write documents
foreach (var article in await GetArticles())
orchestrator.TryWrite(article);
// Complete: drain, reindex/multiplex, alias swap, cleanup
var success = await orchestrator.CompleteAsync(drainMaxWait: TimeSpan.FromSeconds(30));
Console.WriteLine($"Completed: {success}");
- Catalog data: use case guide for catalog data with orchestration
- Provisioning strategies: hash-based index reuse
- Alias strategies: alias swapping behavior