Benthos
Benthos is an open-source service for easily and declaratively creating stream processing and ETL pipelines. It supports a wide range of data sources, external processors, and data sinks, including bit.io!
Connecting to bit.io with Benthos
In order to connect to bit.io with Benthos, you'll need to find your PosgreSQL connection credentials from the bit.io Connect Tab of the database to which you would like to connect. You can find everything you need to know about your PostgreSQL credentials here. To connect to Benthos, we only need the connection string.
Benthos supports 3 different kinds of SQL components, allowing it to integrate with bit.io in many different ways. Luckily, the configuration required for all of these different components is largely the same, and only the bit.io connection string is needed. Let's go through them individually:
sql_select
sql_select
This component is supported as both an input and a processor. As an input, it constructs a query from the configuration and returns the result as an array of messages, one for each row. As a processor, it works mostly the same, except the generated query can be parametrized using messages returned from an input. A basic input/processor configuration using sql_select
and bit.io looks like:
label: ""
sql_select:
driver: postgres
dsn: <your-bitio-connection-string>
table: <your-table>
columns: <list-of-columns-to-return>
where: <optional-where-clause>
args_mapping: <optional-param-mapping>
for other configuration options, consult the Benthos docs.
sql_insert
sql_insert
This component is supported as both a processor and an output. In both cases, it inserts a row into a SQL database for each message, leaving the message unchanged. A basic processor/output configuration using sql_insert
and bit.io looks like:
label: ""
sql_insert:
driver: postgres
dsn: <your-bitio-connection-string>
table: <your-table>
columns: <list-of-columns-to-insert>
args_mapping: <mapping-of-message-fields-to-columns>
for other configuration options, consult the Benthos docs.
sql_raw
sql_raw
This component is supported as both a processor and an output. In both cases, it allows you to execute an arbitrary SQL query for each message. A basic processor/output configuration using sql_raw
and bit.io looks like:
label: ""
sql_insert:
driver: postgres
dsn: <your-bitio-connection-string>
query: <your-query>
args_mapping: <param-mapping-for-query>
for other configuration options, consult the Benthos docs.
Using Benthos with bit.io
Let's set up a working data pipeline using Benthos and bit.io! In our example, we'll use our bit.io database to count page-view events for a hypothetical website, and log our page views using Benthos. To get started, let's first create a table for tracking page views in our bit.io database:
CREATE TABLE page_views (
"path" TEXT UNIQUE,
"count" INTEGER NOT NULL DEFAULT 1
);
Next, let's write a simple Benthos configuration to upsert records sent to Benthos via HTTP POST into our bit.io database:
http:
enabled: true
address: 0.0.0.0:4195
root_path: /benthos
debug_endpoints: false
input:
label: ""
http_server:
path: /post
pipeline:
threads: 1
processors: []
output:
label: ""
sql_raw:
driver: postgres
dsn: <your-bitio-connection-string>
query: >-
INSERT INTO page_views ("path") VALUES ($1)
ON CONFLICT ("path") DO
UPDATE SET count = EXCLUDED.count + 1;
args_mapping: |
root = [
this.path,
]
Run Benthos:
$ benthos -c config.yaml
And make some requests:
$ for i in {1..10}; do curl -X POST http://0.0.0.0:4195/post -d '{"path":"/index"}'; done
$ for i in {1..20}; do curl -X POST http://0.0.0.0:4195/post -d '{"path":"/"}'; done
$ for i in {1..5}; do curl -X POST http://0.0.0.0:4195/post -d '{"path":"/about"}'; done
$ for i in {1..14}; do curl -X POST http://0.0.0.0:4195/post -d '{"path":"/robots.txt"}'; done
As we can see, the page view records were populated in our database as expected:

Updated 11 months ago