Examples v7

Pipelines are the primary way to define AI workflows in EDB Postgres® AI. Each pipeline consists of one or more sequential steps where data flows linearly: the output of step 1 becomes the input for step 2, and so on.

Creating a pipeline (table source)

To create a pipeline using a standard Postgres table as your data source, use the aidb.create_pipeline() function. The following examples show how to set up both a single-step and a multi-step pipeline.

Single-step pipeline

This example creates a knowledge base from a Postgres table. After running the pipeline, you can query the resulting embeddings for semantic search.

SELECT aidb.create_pipeline(
  name => 'kb_pipeline_table',
  source => 'source_table',
  source_key_column => 'id',
  source_data_column => 'content',
  step_1 => 'KnowledgeBase',
  step_1_options => aidb.knowledge_base_config(
     model => 'bert',  -- this is a pre-defined locally running model
     data_format => 'Text'
   )
);

Multi-step pipeline

You can chain multiple operations. This example first parses HTML and then chunks the resulting text.

SELECT aidb.create_pipeline(
   name => 'html_processing_pipeline',
   source => 'web_data_table',
   source_key_column => 'id',
   source_data_column => 'html_content',
   step_1 => 'ParseHtml',
   step_2 => 'ChunkText'
);

Creating a pipeline (volume source)

If your data lives in external storage (like an S3 bucket), you must first define a storage location and a volume.

Step 1: Define storage and volume

Create the PGFS storage location and AIDB volume.

SELECT pgfs.create_storage_location('s3_bucket_location', 's3://my-ai-data',
 options => '{"region": "us-east-1", "skip_signature": "true"}'
 )
SELECT aidb.create_volume('source_volume', 's3_bucket_location', '/', 'Text');

Step 2: Create the pipeline

SELECT aidb.create_pipeline(
   name => 'pipeline_from_s3',
   source => 'source_volume',
   step_1 => 'KnowledgeBase',
   step_1_options => aidb.knowledge_base_config(
     model => 'dummy',
     data_format => 'Text'
   )
);

Running and updating pipelines

Pipelines are disabled by default upon creation. You can trigger them manually or enable automatic processing.

Manual execution

Run the Pipeline once on all existing data:

SELECT aidb.run_pipeline('kb_pipeline_table');

Enable auto-processing

Keep the pipeline up-to-date as source data changes:

SELECT aidb.update_pipeline(
  name => 'kb_pipeline_table',
  auto_processing => 'Live'
);

Monitoring and deletion

To view your existing pipelines and their configurations, query the aidb.pipelines view:

SELECT * FROM aidb.pipelines;
Output
   name      | source_type | source_schema |       source       | source_key_column | source_data_column | destination_type | destination_schema |       destination       | destination_key_column | destination_data_column |                                                 steps                                                  | auto_processing | batch_size | background_sync_interval |              owner_role              
----------------+-------------+---------------+--------------------+-------------------+--------------------+------------------+--------------------+-------------------------+------------------------+-------------------------+--------------------------------------------------------------------------------------------------------+-----------------+------------+--------------------------+--------------------------------------
pipeline__8989 | Table       | public        | source_table__8989 | id                | content            | Table            | public             | pipeline_pipeline__8989 | source_id              | value                   | [{"options": {"max_length": null, "desired_length": 1000}, "operation": "ChunkText", "step_order": 1}] | Live            |         42 | @ 42 mins                | role_pipeline_management_single_step
(1 row)
SELECT name, source, auto_processing FROM aidb.pipelines;
Output
  name       |     source.         | auto_processing
----------------+-------------------+-----------------
pipeline__8989 | source_table__8989 | Live
(1 row)

Deleting a pipeline will also drop any destination tables generated by the Pipeline steps. To delete a Pipeline:

SELECT aidb.delete_pipeline('kb_pipeline_table');

End-to-end example: Multi-step pipeline with intermediate storage

This end-to-end example demonstrates how to configure a multi-step pipeline in EDB Postgres AI that utilizes intermediate storage.

By defining an intermediate_destination, you can persist the results of individual steps (like text chunking) before they pass to the next stage (like summarization), which is useful for debugging or reusing processed data.

Key features of this example:

  • Multi-step workflow: Chains ChunkText (Step 1) into SummarizeText (Step 2).

  • Auto-processing: The pipeline automatically triggers whenever new data is inserted into the source table.

  • Intermediate storage: Demonstrates how to explicitly name an intermediate table versus allowing the system to auto-generate one.

Example 1: Named intermediate destination

In this scenario, we will manually name the table that stores the output of the chunking step.

  • Create a source table for your raw text:

    CREATE TABLE source_table_demo(
      id INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
      content TEXT NOT NULL
    );
  • Define the pipeline with a custom intermediate table:

    SELECT * from aidb.create_pipeline(
      name => 'complex_pipeline_v1',
      source => 'source_table_demo',
      source_key_column => 'id',
      source_data_column => 'content',
      auto_processing => 'Live',
      step_1 => 'ChunkText',
      step_1_options => jsonb_build_object(
          'desired_length', 20,
          'intermediate_destination', jsonb_build_object(
              'enabled', true,
              'destination', 'my_custom_chunk_storage' -- Explicitly named
          )
      ),
      step_2 => 'SummarizeText'
     );
  • Insert data to trigger auto-processing:

    INSERT INTO source_table_demo (content)
    VALUES ('This is a long text example that will be split into segments for easier processing.');
  • View the chunks and final summaries:

    -- View intermediate chunked output
    SELECT * FROM my_custom_chunk_storage;
    
    -- View the final summarized output
    SELECT * FROM pipeline_complex_pipeline_v1;

Example 2: Automatic intermediate destination

If you enable intermediate storage but do not provide a name, the system generates a table following the pattern pipeline_[name]_step_[n].

  • Create a pipeline with auto-named intermediate storage:

    SELECT * from aidb.create_pipeline(
      name => 'auto_named_pipeline',
      source => 'source_table_demo_2',
      source_key_column => 'id',
      source_data_column => 'content',
      auto_processing => 'Live',
      step_1 => 'ChunkText',
      step_1_options => jsonb_build_object(
          'desired_length', 20,
          'intermediate_destination', jsonb_build_object('enabled', true)
      ),
      step_2 => 'SummarizeText'
    );
  • Access the auto-generated intermediate table:

    -- View intermediate chunked output (auto-generated table name)
    SELECT * FROM pipeline_auto_named_pipeline_step_1;

End-to-end example: Scanned PDFs to a searchable knowledge base

This example shows how to build a full pipeline that turns a collection of scanned PDF documents into a vector-searchable knowledge base. It uses three sequential steps:

  1. PdfToImage - renders each PDF page as an image.

  2. PerformOcr - extracts text from each rendered image using an OCR model.

  3. KnowledgeBase - embeds the extracted text and stores it for semantic search.

This pattern is ideal for scanned documents, legacy PDFs without a text layer, and any content where ParsePdf returns empty or low-quality text. By leveraging intermediate storage, you can inspect the OCR output before it feeds into the knowledge base, ensuring data quality and debugging any issues in the workflow.

Prerequisites

You need a registered NVIDIA NIM PaddleOCR model and a registered embedding model. If you don't have these yet, register them first:

-- Register the OCR model
SELECT aidb.create_model(
    'my_paddle_ocr_model',
    'nim_paddle_ocr',
    credentials => '{"api_key": "<NVIDIA_NIM_API_KEY>"}'::JSONB
);

The embedding model (bert) is pre-installed and needs no registration. If you're using a different model, register it here.

Step 1: Create the source table

Create a table to hold the raw PDF data:

CREATE TABLE scanned_pdfs (
    id      INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    name    TEXT NOT NULL,
    pdf_data BYTEA NOT NULL
);

Step 2: Load PDFs into the table

Insert PDF documents as binary data. The example below uses pg_read_binary_file() to load files from the server's file system:

INSERT INTO scanned_pdfs (name, pdf_data)
VALUES ('quarterly_report.pdf', pg_read_binary_file('/path/to/quarterly_report.pdf'));

You can also load PDFs from an S3-compatible volume — see External storage and Creating a pipeline (volume source).

Step 3: Create the pipeline

SELECT aidb.create_pipeline(
    name               => 'scanned_pdf_kb',
    source             => 'scanned_pdfs',
    source_key_column  => 'id',
    source_data_column => 'pdf_data',
    step_1             => 'PdfToImage',
    step_1_options     => '{"dpi": 150}'::jsonb,
    step_2             => 'PerformOcr',
    step_2_options     => aidb.ocr_config(model => 'my_paddle_ocr_model'),
    step_3             => 'KnowledgeBase',
    step_3_options     => aidb.knowledge_base_config(
        model             => 'bert',
        data_format       => 'Text',
        distance_operator => 'Cosine'
    ),
    auto_processing    => 'Live'
);

With auto_processing set to Live, the pipeline will automatically run whenever new PDFs are inserted into the scanned_pdfs table.

Step 4: Run the pipeline on existing data

If you inserted rows before enabling the pipeline, process them now:

SELECT aidb.run_pipeline('scanned_pdf_kb');

Step 5: Query the knowledge base

After the pipeline finishes, query the results using semantic search:

SELECT * FROM aidb.retrieve_text(
    'scanned_pdf_kb',
    'quarterly revenue figures',
    5  -- return top 5 matches
);

Each row in the result corresponds to a text block extracted from a PDF page, ranked by semantic similarity to your query.