Database Connections
Connect Heimdall directly to your databases for seamless data integration and processing.
Supported Databases
Databricks
Connect to your Databricks SQL Warehouse for direct data access.
Configuration
# Databricks connection settings
databricks_config = {
'server_hostname': 'your-workspace.cloud.databricks.com',
'http_path': '/sql/1.0/warehouses/your-warehouse-id',
'access_token': 'your-access-token'
}
Example Integration
import requests
from databricks import sql
# Connect to Databricks
connection = sql.connect(
server_hostname=databricks_config['server_hostname'],
http_path=databricks_config['http_path'],
access_token=databricks_config['access_token']
)
# Query data and send to Heimdall
cursor = connection.cursor()
cursor.execute("SELECT * FROM your_table LIMIT 1000")
data = cursor.fetchall()
# Process with Heimdall Read
for row in data:
text_data = row[0] # Assuming first column is text
response = requests.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': text_data}
)
print(response.json())
MySQL
Connect to MySQL databases for data processing.
Configuration
import mysql.connector
# MySQL connection settings
mysql_config = {
'host': 'your-mysql-host',
'user': 'your-username',
'password': 'your-password',
'database': 'your-database'
}
Example Integration
import mysql.connector
import requests
# Connect to MySQL
connection = mysql.connector.connect(**mysql_config)
cursor = connection.cursor()
# Query data
cursor.execute("SELECT id, text_content FROM documents LIMIT 1000")
rows = cursor.fetchall()
# Process with Heimdall
for row in rows:
doc_id, text_content = row
# Analyze text with Heimdall Read
response = requests.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': text_content}
)
if response.status_code == 200:
analysis = response.json()
# Store results back to database
cursor.execute("""
UPDATE documents
SET sentiment = %s, word_count = %s
WHERE id = %s
""", (analysis['sentiment'], analysis['word_count'], doc_id))
connection.commit()
PostgreSQL
Connect to PostgreSQL databases for advanced data processing.
Configuration
import psycopg2
# PostgreSQL connection settings
postgres_config = {
'host': 'your-postgres-host',
'user': 'your-username',
'password': 'your-password',
'database': 'your-database',
'port': 5432
}
Example Integration
import psycopg2
import requests
import pandas as pd
# Connect to PostgreSQL
connection = psycopg2.connect(**postgres_config)
# Query data
query = "SELECT id, text_content, created_at FROM posts WHERE processed = false"
df = pd.read_sql(query, connection)
# Process with Heimdall
for _, row in df.iterrows():
response = requests.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': row['text_content']}
)
if response.status_code == 200:
analysis = response.json()
# Update database with results
cursor = connection.cursor()
cursor.execute("""
UPDATE posts
SET sentiment = %s,
word_count = %s,
processed = true
WHERE id = %s
""", (analysis['sentiment'], analysis['word_count'], row['id']))
connection.commit()
MariaDB
Connect to MariaDB databases for data processing.
Configuration
import mariadb
# MariaDB connection settings
mariadb_config = {
'host': 'your-mariadb-host',
'user': 'your-username',
'password': 'your-password',
'database': 'your-database',
'port': 3306
}
Example Integration
import mariadb
import requests
# Connect to MariaDB
connection = mariadb.connect(**mariadb_config)
cursor = connection.cursor()
# Query data
cursor.execute("SELECT id, content FROM articles WHERE analyzed = false")
rows = cursor.fetchall()
# Process with Heimdall
for row in rows:
article_id, content = row
# Analyze content with Heimdall Read
response = requests.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': content}
)
if response.status_code == 200:
analysis = response.json()
# Store analysis results
cursor.execute("""
UPDATE articles
SET sentiment = %s,
word_count = %s,
analyzed = true
WHERE id = %s
""", (analysis['sentiment'], analysis['word_count'], article_id))
connection.commit()
Batch Processing
Large Dataset Processing
import pandas as pd
import requests
from sqlalchemy import create_engine
def process_large_dataset(connection_string, table_name, text_column):
# Connect to database
engine = create_engine(connection_string)
# Process in batches
batch_size = 1000
offset = 0
while True:
# Query batch
query = f"""
SELECT id, {text_column}
FROM {table_name}
WHERE processed = false
LIMIT {batch_size} OFFSET {offset}
"""
df = pd.read_sql(query, engine)
if df.empty:
break
# Process batch
for _, row in df.iterrows():
try:
response = requests.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': row[text_column]}
)
if response.status_code == 200:
analysis = response.json()
# Update database
engine.execute(f"""
UPDATE {table_name}
SET sentiment = %s,
word_count = %s,
processed = true
WHERE id = %s
""", (analysis['sentiment'], analysis['word_count'], row['id']))
except Exception as e:
print(f"Error processing row {row['id']}: {e}")
offset += batch_size
print(f"Processed {offset} records")
Real-time Processing
Stream Processing
import asyncio
import aiohttp
import asyncpg
async def process_stream():
# Connect to database
conn = await asyncpg.connect(
host='your-host',
user='your-username',
password='your-password',
database='your-database'
)
# Monitor for new records
async with aiohttp.ClientSession() as session:
while True:
# Query for unprocessed records
rows = await conn.fetch("""
SELECT id, content FROM posts
WHERE processed = false
LIMIT 10
""")
if not rows:
await asyncio.sleep(1) # Wait for new data
continue
# Process records
tasks = []
for row in rows:
task = process_single_record(session, row['id'], row['content'])
tasks.append(task)
await asyncio.gather(*tasks)
await asyncio.sleep(0.1) # Small delay between batches
async def process_single_record(session, record_id, content):
try:
async with session.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': content}
) as response:
if response.status == 200:
analysis = await response.json()
# Update database
await conn.execute("""
UPDATE posts
SET sentiment = %s,
word_count = %s,
processed = true
WHERE id = %s
""", analysis['sentiment'], analysis['word_count'], record_id)
except Exception as e:
print(f"Error processing record {record_id}: {e}")
Best Practices
Connection Management
- Use connection pooling for better performance
- Implement retry logic for failed connections
- Monitor connection health regularly
- Close connections properly to avoid leaks
Security
- Use environment variables for credentials
- Enable SSL/TLS for encrypted connections
- Implement access controls and user permissions
- Regular security updates for database drivers
Next Steps
Now that you can connect to databases:
- Monitor Performance - Track database integration performance
- Follow Best Practices - Learn production deployment tips
- Integrate APIs - Connect to your applications