Skip to main content

Extracting usage data from a Pigment workspace

  • September 12, 2025
  • 1 reply
  • 75 views

Forum|alt.badge.img+3

Note: This is only available to customers on an Enterprise plan.

 

Python script to extract audit data using the Pigment Audit API, for more information please read these Community articles:

1. Generate Pigment Audit API key

https://community.pigment.com/security-permissions-82/manage-api-keys-226

2. Audit API events and event types

https://community.pigment.com/security-permissions-82/audit-logs-api-events-and-event-types-1770

3. Pigment API rate limits

https://community.pigment.com/importing-and-exporting-data-95/rate-limits-for-pigment-public-api-3170?tid=3170&fid=95

 

Please note, this is a complex Python script, so Python docstrings are used extensively to explain the functionality used in the script. 

#!/usr/bin/env python3
"""A command-line tool to fetch Pigment audit logs and import them as a CSV.

This script provides a robust way to extract large volumes of audit data from
the Pigment API, process it into a CSV file with density-sorted columns, and
then upload it to a Pigment import configuration in size-limited chunks.

Key Features:
- Handles large datasets by caching to disk, using minimal memory.
- Automatically retries on temporary network or API errors.
- Provides progress bars for both download and upload operations.
- Securely loads API keys from environment variables.
- Cleans up the intermediate CSV file by default after a successful import.

Example Usage:
# Set environment variables first
export PIGMENT_AUDIT_API_KEY='your_audit_key'
export PIGMENT_IMPORT_API_KEY='your_import_key'

# Run the script to export and import data
python your_script_name.py \
--start-date 2025-01-01 \
--end-date 2025-01-31 \
--output-file "january_logs.csv" \
--import-config "your_config_id_here"
"""

import requests
import csv
from datetime import datetime, timedelta
from dateutil import parser
import sys
import json
import tempfile
import logging
import os
import io
import argparse
from collections import Counter
from typing import Generator, Any
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from tqdm import tqdm

# --- 1. Configure Logging (tqdm-aware) ---
class TqdmLoggingHandler(logging.Handler):
"""A logging handler that uses tqdm.write() to prevent conflicts with bars."""
def __init__(self, level=logging.NOTSET):
super().__init__(level)
def emit(self, record):
try:
msg = self.format(record)
tqdm.write(msg)
self.flush()
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
self.handleError(record)

log = logging.getLogger()
log.setLevel(logging.INFO)
for handler in log.handlers[:]:
log.removeHandler(handler)
handler = TqdmLoggingHandler()
formatter = logging.Formatter('%(asctime)s - [%(levelname)s] - %(message)s', '%Y-%m-%d %H:%M:%S')
handler.setFormatter(formatter)
log.addHandler(handler)


# --- 2. Helper Functions & API Client Class ---

def valid_date(s: str) -> str:
"""Validates that a string is in YYYY-MM-DD format for argparse.

Args:
s: The input string from the command line.

Returns:
The validated date string.

Raises:
argparse.ArgumentTypeError: If the string is not in the correct format.
"""
try:
datetime.strptime(s, "%Y-%m-%d")
return s
except ValueError:
msg = f"Not a valid date: '{s}'. Expected format: YYYY-MM-DD."
raise argparse.ArgumentTypeError(msg)

def flatten_json(nested_json: dict) -> dict:
"""Flattens a nested JSON object into a single-level dictionary.

Nested keys are concatenated with an underscore. For example:
{'a': {'b': 1}} becomes {'a_b': 1}.

Args:
nested_json: The dictionary to flatten.

Returns:
A flattened dictionary.
"""
out = {}
def flatten(x: Any, name: str = ''):
if isinstance(x, dict):
for a in x:
flatten(x[a], name + a + '_')
elif isinstance(x, list):
for i, a in enumerate(x):
flatten(a, name + str(i) + '_')
else:
out[name[:-1]] = x
flatten(nested_json)
return out

class PigmentAPIClient:
"""A client for interacting with the Pigment API, with built-in retries.

This class encapsulates all direct communication with the Pigment Audit and
Import APIs, handling authentication, session management, and network
resilience.
"""
API_AUDIT_URL = 'https://pigment.app/api/audit/v1/events'
API_IMPORT_URL = "https://pigment.app/api/import/push/csv"

def __init__(self, audit_key: str, import_key: str | None = None):
"""Initializes the API client.

Args:
audit_key: The API key for the Audit API.
import_key: The optional API key for the Import API.
"""
if not audit_key:
raise ValueError("Audit API key is required.")
self._audit_headers = {'Authorization': f"Bearer {audit_key}", 'Content-Type': 'application/json; charset=utf-8'}
self._import_headers = {'Authorization': f"Bearer {import_key}", 'Content-Type': 'application/csv; charset=utf-8'} if import_key else {}
self._session = self._create_retry_session()

def _create_retry_session(self) -> requests.Session:
"""Creates a requests session with automatic retries on server errors."""
session = requests.Session()
retry_strategy = Retry(
total=5,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1,
respect_retry_after_header=True
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
return session

def get_audit_events(self, start_date: str, end_date_exclusive: str) -> Generator[dict, None, None]:
"""Yields all audit events for a given period, handling pagination.

Args:
start_date: The start date in YYYY-MM-DD format.
end_date_exclusive: The exclusive end date in YYYY-MM-DD format.

Yields:
A dictionary representing a single audit event.

Raises:
requests.exceptions.HTTPError: If a non-retryable HTTP error occurs.
"""
params = {'ingestedSince': start_date}
while True:
try:
response = self._session.get(self.API_AUDIT_URL, params=params, headers=self._audit_headers)
response.raise_for_status()
data = response.json()
events = data.get('events', [])
if not events: break
for event in events:
if parser.parse(event['eventTimestamp']).strftime("%Y-%m-%d") >= end_date_exclusive:
return
yield event
if data.get('hasMoreEvents') and 'nextEventsCursor' in data:
params['cursor'] = data['nextEventsCursor']
else: break
except requests.exceptions.HTTPError as e:
if e.response.status_code in [429, 500, 502, 503, 504]:
logging.warning(f"Received status {e.response.status_code}. Retrying...")
else:
logging.error(f"HTTP Error fetching events: {e.response.status_code} - {e.response.text}")
raise
except Exception as e:
logging.error(f"An unexpected error occurred while fetching events: {e}", exc_info=True)
raise

def push_csv_chunk(self, payload_bytes: bytes, import_config: str, chunk_number: int) -> None:
"""Pushes a single chunk of CSV data to the import API.

Args:
payload_bytes: The CSV data for the chunk, encoded as bytes.
import_config: The Pigment Import Configuration ID.
chunk_number: The sequential number of the chunk being sent.

Raises:
ValueError: If the import API key was not configured.
requests.exceptions.HTTPError: If a non-retryable HTTP error occurs.
"""
if not self._import_headers:
raise ValueError("Import API key not configured.")
logging.info(f"Sending chunk #{chunk_number} ({len(payload_bytes) / 1_048_576:.2f} MB)...")
try:
url = f"{self.API_IMPORT_URL}?configurationID={import_config}"
response = self._session.post(url, headers=self._import_headers, data=payload_bytes)
response.raise_for_status()
logging.info(f"Successfully sent chunk #{chunk_number}. Response: {response.status_code} - {response.json()}")
except requests.exceptions.HTTPError as e:
if e.response.status_code in [429, 500, 502, 503, 504]:
logging.warning(f"Received status {e.response.status_code} on upload. Retrying...")
else:
logging.error(f"HTTP Error sending chunk #{chunk_number}: {e.response.status_code} - {e.response.text}")
raise

# --- 3. Core Logic ---

def create_audit_csv(client: PigmentAPIClient, start_date: str, end_date: str, output_file: str) -> bool:
"""Fetches audit events, processes them, and writes them to a CSV file.

This function orchestrates the download process. It uses the PigmentAPIClient
to fetch events, caches them to a temporary file on disk, counts header
frequencies for column sorting, and finally writes the formatted CSV.

Args:
client: An initialized PigmentAPIClient instance.
start_date: The start date for the data pull (YYYY-MM-DD).
end_date: The end date for the data pull (YYYY-MM-DD).
output_file: The path where the final CSV file will be saved.

Returns:
True if the CSV was created successfully, False otherwise.
"""
end_date_exclusive = (datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d")
logging.info(f"Fetching data from {start_date} up to (but not including) {end_date_exclusive}")
header_counts = Counter()
event_count = 0
with tempfile.NamedTemporaryFile(mode='w+', encoding='utf-8', suffix='.jsonl') as temp_f:
logging.info(f"Caching data to temporary file: {temp_f.name}")
with tqdm(desc="Processing events", unit=" events", unit_scale=True, ncols=100) as pbar:
for event in client.get_audit_events(start_date, end_date_exclusive):
flattened_event = flatten_json(event)
header_counts.update(flattened_event.keys())
temp_f.write(json.dumps(flattened_event) + '\n')
event_count += 1
pbar.update(1)
logging.info(f"API pass complete. Cached {event_count} events to disk.")
if event_count == 0:
logging.warning("No event data found. CSV file will not be created.")
return False
sorted_headers = header_counts.most_common()
final_headers = [header for header, count in sorted_headers]
temp_f.seek(0)
logging.info(f"Writing data to final output file: '{output_file}'...")
with open(output_file, mode='w', newline='', encoding='utf-8') as final_csv_file:
writer = csv.DictWriter(final_csv_file, fieldnames=final_headers, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
for line in temp_f:
writer.writerow(json.loads(line))
logging.info("Successfully created CSV file.")
return True

def push_csv_in_chunks(client: PigmentAPIClient, file_path: str, import_config: str, chunk_size_mb: int) -> None:
"""Reads a large CSV and pushes it to Pigment in size-limited chunks.

Args:
client: An initialized PigmentAPIClient instance.
file_path: The path to the source CSV file.
import_config: The Pigment Import Configuration ID.
chunk_size_mb: The maximum size for each upload chunk in megabytes.
"""
logging.info(f"Preparing to push '{file_path}' in chunks of ~{chunk_size_mb}MB.")
chunk_size_bytes = chunk_size_mb * 1024 * 1024
try:
file_size = os.path.getsize(file_path)
with tqdm(total=file_size, unit='B', unit_scale=True, unit_divisor=1024,
desc=f"Uploading {os.path.basename(file_path)}", ncols=100) as pbar:
with open(file_path, 'r', encoding='utf-8', newline='') as source_file:
reader = csv.reader(source_file)
header = next(reader)
chunk_number = 1
string_io = io.StringIO()
writer = csv.writer(string_io, quoting=csv.QUOTE_MINIMAL)
writer.writerow(header)

for row in reader:
writer.writerow(row)
if string_io.tell() >= chunk_size_bytes:
payload = string_io.getvalue().encode('utf-8')
client.push_csv_chunk(payload, import_config, chunk_number)
pbar.update(len(payload))
chunk_number += 1
string_io.close()
string_io = io.StringIO()
writer = csv.writer(string_io)
writer.writerow(header)

final_payload = string_io.getvalue().encode('utf-8')
if len(final_payload) > len(','.join(header).encode('utf-8')) + 4:
client.push_csv_chunk(final_payload, import_config, chunk_number)
pbar.update(len(final_payload))

if pbar.n < pbar.total:
pbar.update(pbar.total - pbar.n)

logging.info("All chunks have been sent successfully.")
except FileNotFoundError:
logging.error(f"The file '{file_path}' was not found.")
except Exception as e:
logging.error(f"An unexpected error occurred during the chunking process: {e}", exc_info=True)
raise

# --- Main Execution Block ---
def main() -> None:
"""Main entry point for the script.

Parses command-line arguments, initializes the API client, and orchestrates
the process of creating the CSV and optionally pushing it to Pigment.
"""
arg_parser = argparse.ArgumentParser(
description="Fetch Pigment audit logs, save to CSV, and optionally import back to Pigment.",
formatter_class=argparse.RawTextHelpFormatter
)
arg_parser.add_argument("--start-date", required=True, type=valid_date, help="Start date for export in YYYY-MM-DD format.")
arg_parser.add_argument("--end-date", required=True, type=valid_date, help="End date for export in YYYY-MM-DD format.")
arg_parser.add_argument("--output-file", default="audit_data.csv", help="Path for the output CSV file.")
arg_parser.add_argument("--import-config", default=None, help="The Pigment Import Config ID. Triggers the upload.")
arg_parser.add_argument("--chunk-size", type=int, default=480, help="The upload chunk size in megabytes (MB). (default: 480)")
arg_parser.add_argument("--keep-file", action='store_true', help="Do not delete the intermediate CSV file after a successful upload.")
args = arg_parser.parse_args()

audit_key = os.getenv('PIGMENT_AUDIT_API_KEY')
import_key = os.getenv('PIGMENT_IMPORT_API_KEY')

if not audit_key:
logging.error("PIGMENT_AUDIT_API_KEY environment variable not set. This is required.")
sys.exit(1)

try:
client = PigmentAPIClient(audit_key, import_key)
success = create_audit_csv(client, args.start_date, args.end_date, args.output_file)
if success and args.import_config:
if not import_key:
logging.error("PIGMENT_IMPORT_API_KEY must be set to run an import.")
sys.exit(1)
push_csv_in_chunks(client, args.output_file, args.import_config, args.chunk_size)
if not args.keep_file:
logging.info(f"Upload complete. Deleting intermediate file: {args.output_file}")
try:
os.remove(args.output_file)
except OSError as e:
logging.error(f"Error deleting file {args.output_file}: {e}")
else:
logging.info(f"Upload complete. Intermediate file kept as requested: {args.output_file}")
elif success:
logging.info(f"CSV file created successfully at {args.output_file}. Skipping Pigment import.")

except KeyboardInterrupt:
logging.warning("\nProcess interrupted by user. Exiting.")
sys.exit(0)
except Exception as e:
logging.critical(f"A critical error stopped the process: {e}", exc_info=True)
sys.exit(1)

if __name__ == "__main__":
main()

To run this script, it requires the following environment variablew to be set:

Linux/BASH:

export PIGMENT_AUDIT_API_KEY='your_audit_key'
export PIGMENT_IMPORT_API_KEY='your_import_key'

Windows Shell:

set PIGMENT_AUDIT_API_KEY=your_audit_key
set PIGMENT_IMPORT_API_KEY=your_import_key

Windows PowerShell:

$env:PIGMENT_AUDIT_API_KEY = 'your_audit_key'
$env:PIGMENT_IMPORT_API_KEY = 'your_import_key'

Once defined the script requires the parameters passed to it:

Required Parameters

  • --start-date

    • Description: Start date for the data export in YYYY-MM-DD format.

    • Type: A valid date string.

  • --end-date

    • Description: End date for the data export in YYYY-MM-DD format.

    • Type: A valid date string.

Optional Parameters

  • --output-file

    • Description: Path for the output CSV file.

    • Default: audit_data.csv.

  • --import-config

    • Description: The Pigment Import Configuration ID. Providing this ID will trigger the automatic upload of the generated CSV to Pigment.

    • Default: None.

  • --chunk-size

    • Description: The upload chunk size in megabytes (MB) for the import process.

    • Default: 480.

  • --keep-file

    • Description: A flag that, when present, prevents the script from deleting the intermediate CSV file after a successful upload to Pigment.

    • Action: Stores True if included.

This example will export audit data from 1st September 2025 - 30th September 2025 and import into a defined Pigment import.

    python pigment_audit_tool.py \
--start-date 2025-09-01 \
--end-date 2025-09-30 \
--import-config "your_config_id_here"

This example will allow you to export the audit data to a CSV file:

    python pigment_audit_tool.py \
--start-date 2025-09-01 \
--end-date 2025-09-30 \
--output-file "september_logs.csv"

 

1 reply

Artsemi
Trendsetter
Forum|alt.badge.img+5
  • October 8, 2025

Hi Damian, thanks a lot for the script. It’s working perfectly!