Loading

Incremental sync with semantic enrichment

This use case shows how to maintain a lexical search index and a semantic search index over the same data, using IncrementalSyncOrchestrator to coordinate both channels. The semantic index uses an ELSER inference endpoint for embedding generation.

  • You have reference data (articles, products, documentation) that needs both keyword and semantic search
  • You want zero-downtime schema changes with automatic reindex-vs-multiplex detection
  • You need coordinated alias swapping across multiple indices
public class Article
{
    [Id]
    [Keyword]
    public string Url { get; set; }

    [Text(Analyzer = "standard")]
    public string Title { get; set; }

    [Text(Analyzer = "standard")]
    public string Body { get; set; }

    [ContentHash]
    [Keyword]
    public string Hash { get; set; }

    [Timestamp]
    [JsonPropertyName("@timestamp")]
    public DateTimeOffset UpdatedAt { get; set; }
}
		

Define two index configurations -- lexical and semantic -- using the Variant parameter:

[ElasticsearchMappingContext]
[Entity<Article>(
    Target = EntityTarget.Index,
    Name = "articles-lexical",
    WriteAlias = "articles-lexical",
    ReadAlias = "articles-lexical-search",
    SearchPattern = "articles-lexical-*",
    DatePattern = "yyyy.MM.dd.HHmmss"
)]
[Entity<Article>(
    Target = EntityTarget.Index,
    Name = "articles-semantic",
    Variant = "Semantic",
    WriteAlias = "articles-semantic",
    ReadAlias = "articles-semantic-search",
    SearchPattern = "articles-semantic-*",
    DatePattern = "yyyy.MM.dd.HHmmss"
)]
public static partial class ArticleContext;
		

This generates:

  • ArticleContext.Article.Context -- lexical index
  • ArticleContext.ArticleSemantic.Context -- semantic index
var transport = new DistributedTransport(
    new TransportConfiguration(new Uri("http://localhost:9200"))
);

using var orchestrator = new IncrementalSyncOrchestrator<Article>(
    transport,
    primary: ArticleContext.Article.Context,
    secondary: ArticleContext.ArticleSemantic.Context
);
		

Use a pre-bootstrap task to create the ELSER inference endpoint before the semantic index template is created:

orchestrator.AddPreBootstrapTask(async (transport, ctx) =>
{
    // Create ELSER inference endpoint for semantic embeddings
    var body = """
    {
        "service": "elser",
        "service_settings": {
            "num_allocations": 1,
            "num_threads": 2
        }
    }
    """;
    await transport.PutAsync<StringResponse>(
        "_inference/sparse_embedding/article-elser",
        PostData.String(body), cancellationToken: ctx);
});
		

Alternatively, add the inference endpoint as a bootstrap step on the secondary channel:

orchestrator.ConfigureSecondary = opts =>
{
    // The secondary channel needs a custom bootstrap that includes the inference endpoint
};
		
var strategy = await orchestrator.StartAsync(BootstrapMethod.Failure);
Console.WriteLine($"Strategy: {strategy}");

// Write documents -- the orchestrator routes to the right channels
foreach (var article in await GetArticlesFromSource())
    orchestrator.TryWrite(article);

// Drain, reindex/multiplex, alias swap, cleanup
var success = await orchestrator.CompleteAsync(drainMaxWait: TimeSpan.FromSeconds(60));
		
  1. Reindex or Multiplex

The orchestrator automatically chooses between:

  • Reindex mode: both index schemas are unchanged (template hashes match). Only the primary channel receives writes; after drain, documents are reindexed to the secondary. This is faster because documents are only serialized and sent once.
  • Multiplex mode: a schema has changed or the secondary index doesn't exist. Both channels receive every document simultaneously. This handles schema differences by writing directly to both indices.

Post-completion hook

Track sync results or trigger downstream processes:

orchestrator.OnPostComplete = async (context, ctx) =>
{
    Console.WriteLine($"Strategy used: {context.Strategy}");
    Console.WriteLine($"Batch timestamp: {context.BatchTimestamp}");
    // Notify search service to refresh caches, update metrics, etc.
};