Run a PostgreSQL batch-query.

yaml
type: "io.kestra.plugin.jdbc.postgresql.Batch"

Fetch rows from a table, and bulk insert them to another one.

yaml
id: postgres_bulk_insert
namespace: company.team

tasks:
  - id: query
    type: io.kestra.plugin.jdbc.postgresql.Query
    url: jdbc:postgresql://dev:5432/
    username: "{{ secret('POSTGRES_USERNAME') }}"
    password: "{{ secret('POSTGRES_PASSWORD') }}"
    sql: |
      SELECT *
      FROM xref
      LIMIT 1500;
    fetchType: STORE

  - id: update
    type: io.kestra.plugin.jdbc.postgresql.Batch
    from: "{{ outputs.query.uri }}"
    url: jdbc:postgresql://prod:5433/
    username: "{{ secret('POSTGRES_USERNAME') }}"
    password: "{{ secret('POSTGRES_PASSWORD') }}"
    sql: |
      insert into xref values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )

Fetch rows from a table, and bulk insert them to another one, without using sql query.

yaml
id: postgres_bulk_insert
namespace: company.team

tasks:
  - id: query
    type: io.kestra.plugin.jdbc.postgresql.Query
    url: jdbc:postgresql://dev:5432/
    username: "{{ secret('POSTGRES_USERNAME') }}"
    password: "{{ secret('POSTGRES_PASSWORD') }}"
    sql: |
      SELECT *
      FROM xref
      LIMIT 1500;
    fetchType: STORE

  - id: update
    type: io.kestra.plugin.jdbc.postgresql.Batch
    from: "{{ outputs.query.uri }}"
    url: jdbc:postgresql://prod:5433/
    username: "{{ secret('POSTGRES_USERNAME') }}"
    password: "{{ secret('POSTGRES_PASSWORD') }}"
    table: xre

Use Postgres Batch to bulk insert rows

yaml
  id: postgres_batch
  namespace: company.team

  tasks:
    - id: download_products_csv_file
      type: io.kestra.plugin.core.http.Download
      uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv

    - id: products_csv_to_ion
      type: io.kestra.plugin.serdes.csv.CsvToIon
      from: "{{ outputs.download_products_csv_file.uri }}"

    - id: postgres_create_table
      type: io.kestra.plugin.jdbc.postgresql.Query
      url: "jdbc:postgresql://{{ secret('POSTGRES_HOST') }}:5432/postgres"
      username: "{{ secret('POSTGRES_USERNAME') }}"
      password: "{{ secret('POSTGRES_PASSWORD') }}"
      sql: |
        CREATE TABLE IF NOT EXISTS products(
          product_id varchar(5),
          product_name varchar(100),
          product_category varchar(50),
          brand varchar(50)
        )

    - id: postgres_batch_insert
      type: io.kestra.plugin.jdbc.postgresql.Batch
      url: "jdbc:postgresql://{{ secret('POSTGRES_HOST') }}:5432/postgres"
      username: "{{ secret('POSTGRES_USERNAME') }}"
      password: "{{ secret('POSTGRES_PASSWORD') }}"
      from: "{{ outputs.products_csv_to_ion.uri }}"
      sql: |
        insert into products values (?, ?, ?, ?)
Properties

Source file URI

The JDBC URL to connect to the database.

Default 1000

The size of chunk for every bulk request.

SubType string

The columns to be inserted.

If not provided, ? count need to match the from number of columns.

The database user's password.

Insert query to be executed.

The query must have as many question marks as the number of columns in the table. Example: 'insert into <table_name> values( ? , ? , ? )' for 3 columns. In case you do not want all columns, you need to specify it in the query in the columns property Example: 'insert into <table_name> (id, name) values( ? , ? )' for inserting data into 2 columns: 'id' and 'name'.

Default false

Is the connection SSL?

The SSL cert.

Must be a PEM encoded certificate

The SSL key.

Must be a PEM encoded key

The SSL key password.

Possible Values
DISABLEALLOWPREFERREQUIREVERIFY_CAVERIFY_FULL

The SSL mode.

The SSL root cert.

Must be a PEM encoded certificate

The table from which column names will be retrieved.

This property specifies the table name which will be used to retrieve the columns for the inserted values. You can use it instead of specifying manually the columns in the columns property. In this case, the sql property can also be omitted, an INSERT statement would be generated automatically.

The time zone id to use for date/time manipulation. Default value is the worker's default time zone id.

The database user.

The rows count.

The updated rows count.