Benthos is an open-source service for easily and declaratively creating stream processing and ETL pipelines. It supports a wide range of data sources, external processors, and data sinks, including bit.io!

Connecting to bit.io with Benthos

In order to connect to bit.io with Benthos, you'll need to find your PosgreSQL connection credentials from the bit.io Connect Tab of the database to which you would like to connect. You can find everything you need to know about your PostgreSQL credentials here. To connect to Benthos, we only need the connection string.

Benthos supports 3 different kinds of SQL components, allowing it to integrate with bit.io in many different ways. Luckily, the configuration required for all of these different components is largely the same, and only the bit.io connection string is needed. Let's go through them individually:

sql_select

This component is supported as both an input and a processor. As an input, it constructs a query from the configuration and returns the result as an array of messages, one for each row. As a processor, it works mostly the same, except the generated query can be parametrized using messages returned from an input. A basic input/processor configuration using sql_select and bit.io looks like:

label: ""
sql_select:
  driver: postgres
  dsn: <your-bitio-connection-string>
  table: <your-table>
  columns: <list-of-columns-to-return>
  where: <optional-where-clause>
  args_mapping: <optional-param-mapping>

for other configuration options, consult the Benthos docs.

sql_insert

This component is supported as both a processor and an output. In both cases, it inserts a row into a SQL database for each message, leaving the message unchanged. A basic processor/output configuration using sql_insert and bit.io looks like:

label: ""
sql_insert:
  driver: postgres
  dsn: <your-bitio-connection-string>
  table: <your-table>
  columns: <list-of-columns-to-insert>
  args_mapping: <mapping-of-message-fields-to-columns>

for other configuration options, consult the Benthos docs.

sql_raw

This component is supported as both a processor and an output. In both cases, it allows you to execute an arbitrary SQL query for each message. A basic processor/output configuration using sql_raw and bit.io looks like:

label: ""
sql_insert:
  driver: postgres
  dsn: <your-bitio-connection-string>
  query: <your-query>
  args_mapping: <param-mapping-for-query>

for other configuration options, consult the Benthos docs.

Using Benthos with bit.io

Let's set up a working data pipeline using Benthos and bit.io! In our example, we'll use our bit.io database to count page-view events for a hypothetical website, and log our page views using Benthos. To get started, let's first create a table for tracking page views in our bit.io database:

CREATE TABLE page_views (
  "path" TEXT UNIQUE,
  "count" INTEGER NOT NULL DEFAULT 1
);

Next, let's write a simple Benthos configuration to upsert records sent to Benthos via HTTP POST into our bit.io database:

http:
  enabled: true
  address: 0.0.0.0:4195
  root_path: /benthos
  debug_endpoints: false

input:
  label: ""
  http_server:
    path: /post

pipeline:
  threads: 1
  processors: []
        
output:
  label: ""
  sql_raw:
    driver: postgres
    dsn: <your-bitio-connection-string>
    query: >-
      INSERT INTO page_views ("path") VALUES ($1)
      ON CONFLICT ("path") DO
        UPDATE SET count = EXCLUDED.count + 1;
    args_mapping: |
      root = [
        this.path,
      ]

Run Benthos:

$ benthos -c config.yaml

And make some requests:

$ for i in {1..10}; do curl -X POST http://0.0.0.0:4195/post -d '{"path":"/index"}'; done
$ for i in {1..20}; do curl -X POST http://0.0.0.0:4195/post -d '{"path":"/"}'; done
$ for i in {1..5}; do curl -X POST http://0.0.0.0:4195/post -d '{"path":"/about"}'; done
$ for i in {1..14}; do curl -X POST http://0.0.0.0:4195/post -d '{"path":"/robots.txt"}'; done

As we can see, the page view records were populated in our database as expected:

14861486

Did this page help you?