Skip to main content

Often, we get the request to load enterprise volume data from an Oracle RDBMS into Pigment, whilst Pigment doesn't have a dedicated Oracle RDBMS connector or ODBC capability, this can easily be achieved via Pigment's import API. Below is example code that connects to Oracle, opens a cursor, executes a SQL statement and imports the data into Pigment.
 

1. Generate Pigment Import API key

https://community.pigment.com/security-permissions-82/manage-api-keys-226

 

2. For Imports into Pigment (to understand the process)

https://community.pigment.com/importing-and-exporting-data-95/how-to-trigger-an-import-with-apis-230

 

3. To load data from an Oracle RDBMS directly into Pigment via Pigment's Import API

import oracledb
import getpass
import requests
from io import StringIO
import sys
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Oracle Connection & SQL query
username = 'ORACLE_USERNAME'
connect_string = 'ORACLE_CONNECT_STRING'
password = getpass.getpass(f'Enter password for {username}@{connect_string}: ')
fetch_size = 100000  # Fixed fetch size, 100k rows can be increased providing the size of the fetch does not exceed the 500MB POST limit

# note the date/time conversion performed in Oracle rather than Python
sql = """select channel_id, cust_id, prod_id, promo_id, to_char(time_id,'YYYY-MM-DD') as time_id, 
                unit_cost, unit_price, amount_sold, quantity_sold, total_cost
         from oracletable"""

# Pigment API import details
API_IMPORT_KEY = 'YOUR_PIGMENT_IMPORT_API_KEY'
API_IMPORT_URL = 'https://pigment.app/api/import/push/csv?configurationId='
PIGMENT_IMPORT_CONFIG = 'YOUR_IMPORT_ID'
CSV_SEPARATOR = ','

headers = {
    'Authorization': f'Bearer {API_IMPORT_KEY}'
}

# Execute SQL and push into Pigment
rowcount = 0
logging.info('Executing the following SQL statement...')
logging.info(sql)
logging.info(f"Now trying the import into Pigment Import {PIGMENT_IMPORT_CONFIG}...")

try:
    with oracledb.connect(user=username, password=password, dsn=connect_string) as c:
        with c.cursor() as cursor:
            cursor.execute(sql)
            while True:
                rows = cursor.fetchmany(fetch_size)
                if not rows:
                    break
                buffer = StringIO()
                sys.stdout = buffer
                for row in rows:
                    print(CSV_SEPARATOR.join(map(str, row)))
                    rowcount += 1
                sys.stdout = sys.__stdout__
                data_size = len(buffer.getvalue().encode('utf-8'))
                s = requests.post(API_IMPORT_URL + PIGMENT_IMPORT_CONFIG, headers=headers, data=buffer.getvalue())
                logging.info(f'{data_size} bytes in block')
                logging.info(f'{rowcount} rows loaded')
    logging.info("Data load complete")
except oracledb.DatabaseError as db_err:
    logging.error(f'Database error occurred: {db_err}')
except requests.exceptions.HTTPError as http_err:
    logging.error(f'HTTP error occurred: {http_err}')
except Exception as err:
    logging.error(f'An error occurred: {err}')

Explanation:

  • Import Libraries:
    • oracledb: For connecting to the Oracle database.

    • getpass: For securely getting the password input.

    • requests: For making HTTP requests to the Pigment API.

    • StringIO: For handling in-memory string buffers.

    • sys: For system-specific parameters and functions.

    • logging: For logging information and errors.

  • Oracle Connection & SQL Query:

    • Prompts the user for the Oracle database password.

    • Defines the SQL query to fetch data from the Oracle database.

    • Sets a fixed fetch size of 100,000 rows.

  • Pigment API Import Details:

    • Sets up the Pigment API import details, including the API key, URL and headers.

  • Execute SQL and Push into Pigment:

    • Connects to the Oracle database and executes the SQL query.

    • Fetches data in chunks and writes it to an in-memory buffer.

    • Sends the data to the Pigment API in chunks.

    • Logs the number of bytes and rows loaded.

 

NOTE: As an alternative to Python, this could be accomplished using PL/SQL using Oracle’s ability to make REST API calls.

CREATE OR REPLACE PROCEDURE send_data_to_pigment AS
CURSOR c1 IS
SELECT channel_id, cust_id, prod_id, promo_id, TO_CHAR(time_id, 'YYYY-MM-DD') AS time_id,
unit_cost, unit_price, amount_sold, quantity_sold, total_cost
FROM oracletable;
TYPE t_row IS TABLE OF c1%ROWTYPE;
v_data t_row;
l_http_request UTL_HTTP.req;
l_http_response UTL_HTTP.resp;
l_url VARCHAR2(32767) := 'https://pigment.app/api/import/push/csv?configurationId=YOUR_IMPORT_ID';
l_api_key VARCHAR2(32767) := 'YOUR_PIGMENT_IMPORT_API_KEY';
l_data CLOB;
BEGIN
OPEN c1;
LOOP
FETCH c1 BULK COLLECT INTO v_data LIMIT 1000;
EXIT WHEN v_data.COUNT = 0;
FOR i IN 1..v_data.COUNT LOOP
l_data := l_data || v_data(i).channel_id || ',' || v_data(i).cust_id || ',' || v_data(i).prod_id || ',' ||
v_data(i).promo_id || ',' || v_data(i).time_id || ',' || v_data(i).unit_cost || ',' ||
v_data(i).unit_price || ',' || v_data(i).amount_sold || ',' || v_data(i).quantity_sold || ',' ||
v_data(i).total_cost || CHR(10);
END LOOP;

l_http_request := UTL_HTTP.begin_request(l_url, 'POST', 'HTTP/1.1');
UTL_HTTP.set_header(l_http_request, 'Content-Type', 'application/csv');
UTL_HTTP.set_header(l_http_request, 'Authorization', 'Bearer ' || l_api_key);
UTL_HTTP.set_header(l_http_request, 'Content-Length', LENGTH(l_data));
UTL_HTTP.write_text(l_http_request, l_data);

l_http_response := UTL_HTTP.get_response(l_http_request);
UTL_HTTP.end_response(l_http_response);
END LOOP;
CLOSE c1;
END send_data_to_pigment;
/

 

Be the first to reply!

Reply