Skip to main content

Often, we get the request to load enterprise volume data from MySQL RDBMS into Pigment. While Pigment doesn’t have a dedicated MySQL connector or ODBC capability, this can easily be achieved via Pigment’s import API. Below is example code that connects to MySQL, executes a SQL statement, and imports the data into Pigment.

 

1. Generate Pigment Import API key

https://community.pigment.com/security-permissions-82/manage-api-keys-226

 

2. For Imports into Pigment (to understand the process)

https://community.pigment.com/importing-and-exporting-data-95/how-to-trigger-an-import-with-apis-230

 

3. Load Data from MySQL RDBMS into Pigment via Pigment’s Import API using Python:

import mysql.connector
import getpass
import requests
from io import StringIO
import sys
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# MySQL Connection & SQL query
username = 'MYSQL_USERNAME'
server = 'MYSQL_SERVER'
database = 'MYSQL_DATABASE'
password = getpass.getpass(f'Enter password for {username}@{server}: ')
fetch_size = 100000 # Fixed fetch size, 100k rows can be increased providing the size of the fetch does not exceed the 500MB POST limit

# Note the date/time conversion performed in MySQL rather than Python
sql = """
SELECT channel_id, cust_id, prod_id, promo_id, DATE_FORMAT(time_id, '%Y-%m-%d') AS time_id, unit_cost, unit_price, amount_sold, quantity_sold, total_cost
FROM mysqltable
"""

# Pigment API import details
API_IMPORT_KEY = 'YOUR_PIGMENT_IMPORT_API_KEY'
API_IMPORT_URL = 'https://pigment.app/api/import/push/csv?configurationId='
PIGMENT_IMPORT_CONFIG = 'YOUR_IMPORT_ID'
CSV_SEPARATOR = ','
headers = {
'Authorization': f'Bearer {API_IMPORT_KEY}'
}

# Execute SQL and push into Pigment
rowcount = 0
logging.info('Executing the following SQL statement...')
logging.info(sql)
logging.info(f'Now trying the import into Pigment Import {PIGMENT_IMPORT_CONFIG}...')

try:
conn = mysql.connector.connect(
host=server,
user=username,
password=password,
database=database
)
cursor = conn.cursor()
cursor.execute(sql)
while True:
rows = cursor.fetchmany(fetch_size)
if not rows:
break
buffer = StringIO()
sys.stdout = buffer
for row in rows:
print(CSV_SEPARATOR.join(map(str, row)))
rowcount += len(rows)
sys.stdout = sys.__stdout__
data_size = len(buffer.getvalue().encode('utf-8'))
s = requests.post(API_IMPORT_URL + PIGMENT_IMPORT_CONFIG, headers=headers, data=buffer.getvalue())
logging.info(f'{data_size} bytes in block')
logging.info(f'{rowcount} rows loaded')
logging.info('Data load complete')
except mysql.connector.Error as db_err:
logging.error(f'Database error occurred: {db_err}')
except requests.exceptions.HTTPError as http_err:
logging.error(f'HTTP error occurred: {http_err}')
except Exception as err:
logging.error(f'An error occurred: {err}')
finally:
cursor.close()
conn.close()

Explanation:

  • Import Libraries:

    • mysql.connector: For connecting to the MySQL database.

    • getpass: For securely getting the password input.

    • requests: For making HTTP requests to the Pigment API.

    • StringIO: For handling in-memory string buffers.

    • sys: For system-specific parameters and functions.

    • logging: For logging information and errors.

  • MySQL Connection & SQL Query:

    • Prompts the user for the MySQL database password.

    • Defines the SQL query to fetch data from the MySQL database.

    • Sets a fixed fetch size of 100,000 rows.

  • Pigment API Import Details:

    • Sets up the Pigment API import details, including the API key, URL, and headers.

  • Execute SQL and Push into Pigment:

    • Connects to the MySQL database and executes the SQL query.

    • Fetches data in chunks and writes it to an in-memory buffer.

    • Sends the data to the Pigment API in chunks.

    • Logs the number of bytes and rows loaded.

This code will help you load data from MySQL RDBMS directly into Pigment using Pigment's Import API.

Be the first to reply!

Reply