Often, we get the request to load enterprise volume data from MySQL RDBMS into Pigment. While Pigment doesn’t have a dedicated MySQL connector or ODBC capability, this can easily be achieved via Pigment’s import API. Below is example code that connects to MySQL, executes a SQL statement, and imports the data into Pigment.
1. Generate Pigment Import API key
https://community.pigment.com/security-permissions-82/manage-api-keys-226
2. For Imports into Pigment (to understand the process)
https://community.pigment.com/importing-and-exporting-data-95/how-to-trigger-an-import-with-apis-230
3. Load Data from MySQL RDBMS into Pigment via Pigment’s Import API using Python:
import mysql.connector
import getpass
import requests
from io import StringIO
import sys
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# MySQL Connection & SQL query
username = 'MYSQL_USERNAME'
server = 'MYSQL_SERVER'
database = 'MYSQL_DATABASE'
password = getpass.getpass(f'Enter password for {username}@{server}: ')
fetch_size = 100000 # Fixed fetch size, 100k rows can be increased providing the size of the fetch does not exceed the 500MB POST limit
# Note the date/time conversion performed in MySQL rather than Python
sql = """
SELECT channel_id, cust_id, prod_id, promo_id, DATE_FORMAT(time_id, '%Y-%m-%d') AS time_id, unit_cost, unit_price, amount_sold, quantity_sold, total_cost
FROM mysqltable
"""
# Pigment API import details
API_IMPORT_KEY = 'YOUR_PIGMENT_IMPORT_API_KEY'
API_IMPORT_URL = 'https://pigment.app/api/import/push/csv?configurationId='
PIGMENT_IMPORT_CONFIG = 'YOUR_IMPORT_ID'
CSV_SEPARATOR = ','
headers = {
'Authorization': f'Bearer {API_IMPORT_KEY}'
}
# Execute SQL and push into Pigment
rowcount = 0
logging.info('Executing the following SQL statement...')
logging.info(sql)
logging.info(f'Now trying the import into Pigment Import {PIGMENT_IMPORT_CONFIG}...')
try:
conn = mysql.connector.connect(
host=server,
user=username,
password=password,
database=database
)
cursor = conn.cursor()
cursor.execute(sql)
while True:
rows = cursor.fetchmany(fetch_size)
if not rows:
break
buffer = StringIO()
sys.stdout = buffer
for row in rows:
print(CSV_SEPARATOR.join(map(str, row)))
rowcount += len(rows)
sys.stdout = sys.__stdout__
data_size = len(buffer.getvalue().encode('utf-8'))
s = requests.post(API_IMPORT_URL + PIGMENT_IMPORT_CONFIG, headers=headers, data=buffer.getvalue())
logging.info(f'{data_size} bytes in block')
logging.info(f'{rowcount} rows loaded')
logging.info('Data load complete')
except mysql.connector.Error as db_err:
logging.error(f'Database error occurred: {db_err}')
except requests.exceptions.HTTPError as http_err:
logging.error(f'HTTP error occurred: {http_err}')
except Exception as err:
logging.error(f'An error occurred: {err}')
finally:
cursor.close()
conn.close()
Explanation:
-
Import Libraries:
-
mysql.connector
: For connecting to the MySQL database. -
getpass
: For securely getting the password input. -
requests
: For making HTTP requests to the Pigment API. -
StringIO
: For handling in-memory string buffers. -
sys
: For system-specific parameters and functions. -
logging
: For logging information and errors.
-
-
MySQL Connection & SQL Query:
-
Prompts the user for the MySQL database password.
-
Defines the SQL query to fetch data from the MySQL database.
-
Sets a fixed fetch size of 100,000 rows.
-
-
Pigment API Import Details:
-
Sets up the Pigment API import details, including the API key, URL, and headers.
-
-
Execute SQL and Push into Pigment:
-
Connects to the MySQL database and executes the SQL query.
-
Fetches data in chunks and writes it to an in-memory buffer.
-
Sends the data to the Pigment API in chunks.
-
Logs the number of bytes and rows loaded.
-
This code will help you load data from MySQL RDBMS directly into Pigment using Pigment's Import API.