Often, we get the request to load enterprise volume data from an Oracle RDBMS into Pigment, whilst Pigment doesn't have a dedicated Oracle RDBMS connector or ODBC capability, this can easily be achieved via Pigment's import API. Below is example code that connects to Oracle, opens a cursor, executes a SQL statement and imports the data into Pigment.
1. Generate Pigment Import API key
https://community.pigment.com/security-permissions-82/manage-api-keys-226
2. For Imports into Pigment (to understand the process)
https://community.pigment.com/importing-and-exporting-data-95/how-to-trigger-an-import-with-apis-230
3. To load data from an Oracle RDBMS directly into Pigment via Pigment's Import API
import oracledb
import getpass
import requests
from io import StringIO
import sys
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Oracle Connection & SQL query
username = 'ORACLE_USERNAME'
connect_string = 'ORACLE_CONNECT_STRING'
password = getpass.getpass(f'Enter password for {username}@{connect_string}: ')
fetch_size = 100000 # Fixed fetch size, 100k rows can be increased providing the size of the fetch does not exceed the 500MB POST limit
# note the date/time conversion performed in Oracle rather than Python
sql = """select channel_id, cust_id, prod_id, promo_id, to_char(time_id,'YYYY-MM-DD') as time_id,
unit_cost, unit_price, amount_sold, quantity_sold, total_cost
from oracletable"""
# Pigment API import details
API_IMPORT_KEY = 'YOUR_PIGMENT_IMPORT_API_KEY'
API_IMPORT_URL = 'https://pigment.app/api/import/push/csv?configurationId='
PIGMENT_IMPORT_CONFIG = 'YOUR_IMPORT_ID'
CSV_SEPARATOR = ','
headers = {
'Authorization': f'Bearer {API_IMPORT_KEY}'
}
# Execute SQL and push into Pigment
rowcount = 0
logging.info('Executing the following SQL statement...')
logging.info(sql)
logging.info(f"Now trying the import into Pigment Import {PIGMENT_IMPORT_CONFIG}...")
try:
with oracledb.connect(user=username, password=password, dsn=connect_string) as c:
with c.cursor() as cursor:
cursor.execute(sql)
while True:
rows = cursor.fetchmany(fetch_size)
if not rows:
break
buffer = StringIO()
sys.stdout = buffer
for row in rows:
print(CSV_SEPARATOR.join(map(str, row)))
rowcount += 1
sys.stdout = sys.__stdout__
data_size = len(buffer.getvalue().encode('utf-8'))
s = requests.post(API_IMPORT_URL + PIGMENT_IMPORT_CONFIG, headers=headers, data=buffer.getvalue())
logging.info(f'{data_size} bytes in block')
logging.info(f'{rowcount} rows loaded')
logging.info("Data load complete")
except oracledb.DatabaseError as db_err:
logging.error(f'Database error occurred: {db_err}')
except requests.exceptions.HTTPError as http_err:
logging.error(f'HTTP error occurred: {http_err}')
except Exception as err:
logging.error(f'An error occurred: {err}')
Explanation:
- Import Libraries:
-
oracledb
: For connecting to the Oracle database. -
getpass
: For securely getting the password input. -
requests
: For making HTTP requests to the Pigment API. -
StringIO
: For handling in-memory string buffers. -
sys
: For system-specific parameters and functions. -
logging
: For logging information and errors.
-
-
Oracle Connection & SQL Query:
-
Prompts the user for the Oracle database password.
-
Defines the SQL query to fetch data from the Oracle database.
-
Sets a fixed fetch size of 100,000 rows.
-
-
Pigment API Import Details:
-
Sets up the Pigment API import details, including the API key, URL and headers.
-
-
Execute SQL and Push into Pigment:
-
Connects to the Oracle database and executes the SQL query.
-
Fetches data in chunks and writes it to an in-memory buffer.
-
Sends the data to the Pigment API in chunks.
-
Logs the number of bytes and rows loaded.
-
NOTE: As an alternative to Python, this could be accomplished using PL/SQL using Oracle’s ability to make REST API calls.
CREATE OR REPLACE PROCEDURE send_data_to_pigment AS
CURSOR c1 IS
SELECT channel_id, cust_id, prod_id, promo_id, TO_CHAR(time_id, 'YYYY-MM-DD') AS time_id,
unit_cost, unit_price, amount_sold, quantity_sold, total_cost
FROM oracletable;
TYPE t_row IS TABLE OF c1%ROWTYPE;
v_data t_row;
l_http_request UTL_HTTP.req;
l_http_response UTL_HTTP.resp;
l_url VARCHAR2(32767) := 'https://pigment.app/api/import/push/csv?configurationId=YOUR_IMPORT_ID';
l_api_key VARCHAR2(32767) := 'YOUR_PIGMENT_IMPORT_API_KEY';
l_data CLOB;
BEGIN
OPEN c1;
LOOP
FETCH c1 BULK COLLECT INTO v_data LIMIT 1000;
EXIT WHEN v_data.COUNT = 0;
FOR i IN 1..v_data.COUNT LOOP
l_data := l_data || v_data(i).channel_id || ',' || v_data(i).cust_id || ',' || v_data(i).prod_id || ',' ||
v_data(i).promo_id || ',' || v_data(i).time_id || ',' || v_data(i).unit_cost || ',' ||
v_data(i).unit_price || ',' || v_data(i).amount_sold || ',' || v_data(i).quantity_sold || ',' ||
v_data(i).total_cost || CHR(10);
END LOOP;
l_http_request := UTL_HTTP.begin_request(l_url, 'POST', 'HTTP/1.1');
UTL_HTTP.set_header(l_http_request, 'Content-Type', 'application/csv');
UTL_HTTP.set_header(l_http_request, 'Authorization', 'Bearer ' || l_api_key);
UTL_HTTP.set_header(l_http_request, 'Content-Length', LENGTH(l_data));
UTL_HTTP.write_text(l_http_request, l_data);
l_http_response := UTL_HTTP.get_response(l_http_request);
UTL_HTTP.end_response(l_http_response);
END LOOP;
CLOSE c1;
END send_data_to_pigment;
/