DatabaseService Documentation
Comprehensive DatabaseService Implementation Guide
The DatabaseService is responsible for managing MySQL database connections, executing queries, discovering schemas, and optimizing database interactions. It provides secure connection pooling, credential encryption, and comprehensive database operations.
Service Overview
Responsibilities
The DatabaseService handles:
- Connection Management - Secure MySQL connection pooling and lifecycle management
- Query Execution - Full-text search query execution with optimization
- Schema Discovery - Database schema introspection and metadata extraction
- Security - Credential encryption and secure connection handling
- Health Monitoring - Connection health checks and performance monitoring
Architecture
export class DatabaseService {
constructor(
private encryptionService: EncryptionService,
private logger: Logger
) {}
// Core Methods
async addDatabase(userId: string, config: DatabaseConfig): Promise<Database>;
async removeDatabase(userId: string, databaseId: string): Promise<void>;
async testConnection(config: DatabaseConfig): Promise<ConnectionTestResult>;
async executeSearch(
databaseId: string,
query: SearchQuery
): Promise<SearchResult[]>;
async discoverSchema(databaseId: string): Promise<DatabaseSchema>;
// Connection Management
private async createConnection(
config: DatabaseConfig
): Promise<mysql.Connection>;
private async getConnectionPool(databaseId: string): Promise<mysql.Pool>;
private async releaseConnection(connection: mysql.Connection): Promise<void>;
}
Core Functionality
Database Connection Management
Adding Database Connections
The service securely stores database credentials with encryption:
interface DatabaseConfig {
name: string
host: string
port: number
database: string
username: string
password: string
ssl?: SSLConfig
connectionOptions?: ConnectionOptions
}
interface SSLConfig {
enabled: boolean
rejectUnauthorized?: boolean
ca?: string
cert?: string
key?: string
}
async addDatabase(userId: string, config: DatabaseConfig): Promise<Database> {
try {
// Validate configuration
this.validateDatabaseConfig(config)
// Test connection before saving
await this.testConnection(config)
// Encrypt sensitive credentials
const encryptedPassword = await this.encryptionService.encrypt(config.password)
const encryptedSSL = config.ssl ? await this.encryptionService.encrypt(JSON.stringify(config.ssl)) : null
// Save to database
const database = await this.saveDatabaseConfig({
...config,
userId,
password: encryptedPassword,
ssl: encryptedSSL,
status: 'connected',
createdAt: new Date(),
lastTested: new Date()
})
// Initialize connection pool
await this.initializeConnectionPool(database.id, config)
this.logger.info('Database added successfully', {
userId,
databaseId: database.id,
name: config.name
})
return database
} catch (error) {
this.logger.error('Failed to add database', { error, userId, config: { ...config, password: '[REDACTED]' } })
throw new AppError('DATABASE_ADD_FAILED', error.message)
}
}
Connection Pooling
The service uses MySQL connection pooling for efficient resource management:
interface ConnectionPoolConfig {
connectionLimit: number
acquireTimeout: number
timeout: number
charset: string
timezone: string
}
private async initializeConnectionPool(databaseId: string, config: DatabaseConfig): Promise<void> {
const poolConfig: mysql.PoolOptions = {
host: config.host,
port: config.port,
user: config.username,
password: config.password,
database: config.database,
connectionLimit: config.connectionOptions?.connectionLimit || 10,
acquireTimeout: config.connectionOptions?.acquireTimeout || 60000,
timeout: config.connectionOptions?.timeout || 60000,
charset: config.connectionOptions?.charset || 'utf8mb4',
timezone: config.connectionOptions?.timezone || 'UTC',
ssl: config.ssl?.enabled ? {
rejectUnauthorized: config.ssl.rejectUnauthorized ?? true,
ca: config.ssl.ca,
cert: config.ssl.cert,
key: config.ssl.key
} : false,
// Connection pool events
onConnectionAdd: (connection) => {
this.logger.debug('Connection added to pool', { databaseId, connectionId: connection.threadId })
},
onConnectionRemove: (connection) => {
this.logger.debug('Connection removed from pool', { databaseId, connectionId: connection.threadId })
}
}
const pool = mysql.createPool(poolConfig)
this.connectionPools.set(databaseId, pool)
// Test pool connectivity
await this.testPoolConnection(pool)
}
private async testPoolConnection(pool: mysql.Pool): Promise<void> {
return new Promise((resolve, reject) => {
pool.getConnection((err, connection) => {
if (err) {
return reject(new AppError('CONNECTION_POOL_ERROR', err.message))
}
connection.ping((pingErr) => {
connection.release()
if (pingErr) {
return reject(new AppError('CONNECTION_TEST_FAILED', pingErr.message))
}
resolve()
})
})
})
}
Search Query Execution
Full-Text Search Implementation
The service executes optimized MySQL full-text searches:
interface SearchQuery {
query: string
tables: string[]
columns: string[]
limit: number
offset: number
mode: 'natural' | 'boolean' | 'semantic'
filters?: SearchFilters
}
interface SearchFilters {
dateRange?: { from: string; to: string }
minScore?: number
exclude?: string[]
}
async executeSearch(databaseId: string, searchQuery: SearchQuery): Promise<SearchResult[]> {
const startTime = Date.now()
try {
const pool = await this.getConnectionPool(databaseId)
const connection = await this.getConnection(pool)
// Build optimized search query
const { sql, params } = this.buildSearchQuery(searchQuery)
this.logger.debug('Executing search query', {
databaseId,
sql: this.sanitizeQueryForLogging(sql),
queryLength: searchQuery.query.length,
tablesCount: searchQuery.tables.length
})
// Execute query with timeout
const results = await this.executeQueryWithTimeout(connection, sql, params, 30000)
// Process and enhance results
const processedResults = await this.processSearchResults(results, searchQuery)
const executionTime = Date.now() - startTime
this.logger.info('Search query completed', {
databaseId,
resultCount: processedResults.length,
executionTime
})
return processedResults
} catch (error) {
const executionTime = Date.now() - startTime
this.logger.error('Search query failed', {
error,
databaseId,
query: searchQuery.query,
executionTime
})
throw new AppError('SEARCH_QUERY_FAILED', error.message)
}
}
private buildSearchQuery(searchQuery: SearchQuery): { sql: string; params: any[] } {
const { query, tables, columns, limit, offset, mode, filters } = searchQuery
// Escape and sanitize inputs
const escapedQuery = mysql.escape(query)
const escapedTables = tables.map(table => mysql.escapeId(table))
const escapedColumns = columns.map(col => mysql.escapeId(col))
// Build MATCH AGAINST clause based on mode
let matchClause: string
switch (mode) {
case 'boolean':
matchClause = `MATCH(${escapedColumns.join(',')}) AGAINST(${escapedQuery} IN BOOLEAN MODE)`
break
case 'natural':
default:
matchClause = `MATCH(${escapedColumns.join(',')}) AGAINST(${escapedQuery} IN NATURAL LANGUAGE MODE)`
break
}
// Build SELECT clause
const selectColumns = [
'*',
`${matchClause} as relevance_score`
]
// Build WHERE clause
const whereConditions = [matchClause]
// Add filters
if (filters?.minScore) {
whereConditions.push(`${matchClause} >= ${filters.minScore}`)
}
if (filters?.dateRange) {
// Assume created_at column exists
whereConditions.push(`created_at BETWEEN ? AND ?`)
}
if (filters?.exclude?.length) {
const excludeConditions = filters.exclude.map(term =>
`NOT (${escapedColumns.map(col => `${col} LIKE ?`).join(' OR ')})`
)
whereConditions.push(...excludeConditions)
}
// Build query for each table
const tableQueries = escapedTables.map(table => `
SELECT ${selectColumns.join(', ')}, '${table}' as source_table
FROM ${table}
WHERE ${whereConditions.join(' AND ')}
`)
// Combine with UNION and sort by relevance
const sql = `
${tableQueries.join(' UNION ALL ')}
ORDER BY relevance_score DESC
LIMIT ${limit} OFFSET ${offset}
`
// Build parameters
const params: any[] = []
if (filters?.dateRange) {
params.push(filters.dateRange.from, filters.dateRange.to)
}
if (filters?.exclude?.length) {
filters.exclude.forEach(term => {
columns.forEach(() => params.push(`%${term}%`))
})
}
return { sql, params }
}
Schema Discovery
Database Schema Introspection
The service discovers database structure and indexes:
interface DatabaseSchema {
database: string
version: string
characterSet: string
collation: string
tables: TableSchema[]
searchOptimizations: SearchOptimization[]
}
interface TableSchema {
name: string
engine: string
rowCount: number
dataSize: string
indexSize: string
columns: ColumnSchema[]
indexes: IndexSchema[]
searchable: boolean
fullTextColumns: string[]
}
interface ColumnSchema {
name: string
type: string
nullable: boolean
key: string
default: any
extra: string
}
interface IndexSchema {
name: string
type: 'BTREE' | 'FULLTEXT' | 'HASH'
unique: boolean
columns: string[]
}
async discoverSchema(databaseId: string): Promise<DatabaseSchema> {
try {
const pool = await this.getConnectionPool(databaseId)
const connection = await this.getConnection(pool)
// Get database information
const dbInfo = await this.getDatabaseInfo(connection)
// Get all tables
const tables = await this.getTablesInfo(connection)
// Get detailed schema for each table
const tableSchemas = await Promise.all(
tables.map(table => this.getTableSchema(connection, table.name))
)
// Analyze search optimization opportunities
const searchOptimizations = await this.analyzeSearchOptimizations(tableSchemas)
const schema: DatabaseSchema = {
database: dbInfo.database,
version: dbInfo.version,
characterSet: dbInfo.characterSet,
collation: dbInfo.collation,
tables: tableSchemas,
searchOptimizations
}
this.logger.info('Schema discovery completed', {
databaseId,
tablesCount: tableSchemas.length,
searchableTablesCount: tableSchemas.filter(t => t.searchable).length
})
return schema
} catch (error) {
this.logger.error('Schema discovery failed', { error, databaseId })
throw new AppError('SCHEMA_DISCOVERY_FAILED', error.message)
}
}
private async getTableSchema(connection: mysql.Connection, tableName: string): Promise<TableSchema> {
// Get column information
const columns = await this.queryDatabase(connection, `
SELECT
COLUMN_NAME as name,
DATA_TYPE as type,
IS_NULLABLE = 'YES' as nullable,
COLUMN_KEY as \`key\`,
COLUMN_DEFAULT as \`default\`,
EXTRA as extra
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = ? AND TABLE_SCHEMA = DATABASE()
ORDER BY ORDINAL_POSITION
`, [tableName])
// Get index information
const indexes = await this.queryDatabase(connection, `
SELECT
INDEX_NAME as name,
INDEX_TYPE as type,
NOT NON_UNIQUE as unique,
GROUP_CONCAT(COLUMN_NAME ORDER BY SEQ_IN_INDEX) as columns
FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_NAME = ? AND TABLE_SCHEMA = DATABASE()
GROUP BY INDEX_NAME, INDEX_TYPE, NON_UNIQUE
`, [tableName])
// Get table stats
const tableStats = await this.queryDatabase(connection, `
SELECT
ENGINE as engine,
TABLE_ROWS as rowCount,
ROUND(DATA_LENGTH/1024/1024, 2) as dataSizeMB,
ROUND(INDEX_LENGTH/1024/1024, 2) as indexSizeMB
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = ? AND TABLE_SCHEMA = DATABASE()
`, [tableName])
const stats = tableStats[0] || {}
// Identify full-text searchable columns
const fullTextIndexes = indexes.filter(idx => idx.type === 'FULLTEXT')
const fullTextColumns = fullTextIndexes.flatMap(idx =>
idx.columns.split(',').map(col => col.trim())
)
// Determine if table is searchable
const textColumns = columns.filter(col =>
['text', 'mediumtext', 'longtext', 'varchar'].some(type =>
col.type.toLowerCase().includes(type)
)
)
return {
name: tableName,
engine: stats.engine || 'Unknown',
rowCount: stats.rowCount || 0,
dataSize: `${stats.dataSizeMB || 0}MB`,
indexSize: `${stats.indexSizeMB || 0}MB`,
columns: columns.map(col => ({
name: col.name,
type: col.type,
nullable: col.nullable,
key: col.key,
default: col.default,
extra: col.extra
})),
indexes: indexes.map(idx => ({
name: idx.name,
type: idx.type,
unique: idx.unique,
columns: idx.columns.split(',').map(col => col.trim())
})),
searchable: fullTextColumns.length > 0 || textColumns.length > 0,
fullTextColumns: [...new Set(fullTextColumns)]
}
}
Connection Health Monitoring
Health Checks and Monitoring
The service provides comprehensive health monitoring:
interface ConnectionHealth {
status: 'healthy' | 'degraded' | 'unhealthy'
responseTime: number
activeConnections: number
maxConnections: number
connectionUtilization: number
uptime: number
lastError: Error | null
metrics: HealthMetrics
}
interface HealthMetrics {
queriesPerSecond: number
averageQueryTime: number
slowQueries: number
connectionErrors: number
lastHealthCheck: Date
}
async checkConnectionHealth(databaseId: string): Promise<ConnectionHealth> {
const startTime = Date.now()
try {
const pool = await this.getConnectionPool(databaseId)
// Test basic connectivity
const connection = await this.getConnection(pool)
// Execute health check query
const healthQuery = 'SELECT 1 as health_check, NOW() as server_time'
const result = await this.queryDatabase(connection, healthQuery)
connection.release()
const responseTime = Date.now() - startTime
// Get pool statistics
const poolStats = this.getPoolStatistics(pool)
// Get performance metrics
const metrics = await this.getPerformanceMetrics(databaseId)
const health: ConnectionHealth = {
status: this.determineHealthStatus(responseTime, poolStats, metrics),
responseTime,
activeConnections: poolStats.activeConnections,
maxConnections: poolStats.maxConnections,
connectionUtilization: poolStats.activeConnections / poolStats.maxConnections,
uptime: poolStats.uptime,
lastError: null,
metrics
}
this.logger.debug('Health check completed', {
databaseId,
status: health.status,
responseTime
})
return health
} catch (error) {
this.logger.error('Health check failed', { error, databaseId })
return {
status: 'unhealthy',
responseTime: Date.now() - startTime,
activeConnections: 0,
maxConnections: 0,
connectionUtilization: 0,
uptime: 0,
lastError: error,
metrics: {
queriesPerSecond: 0,
averageQueryTime: 0,
slowQueries: 0,
connectionErrors: 0,
lastHealthCheck: new Date()
}
}
}
}
private determineHealthStatus(
responseTime: number,
poolStats: any,
metrics: HealthMetrics
): 'healthy' | 'degraded' | 'unhealthy' {
// Unhealthy conditions
if (responseTime > 5000) return 'unhealthy' // 5+ seconds
if (poolStats.connectionUtilization > 0.9) return 'unhealthy' // >90% pool usage
if (metrics.connectionErrors > 10) return 'unhealthy' // Too many errors
// Degraded conditions
if (responseTime > 1000) return 'degraded' // 1+ seconds
if (poolStats.connectionUtilization > 0.7) return 'degraded' // >70% pool usage
if (metrics.slowQueries > 5) return 'degraded' // Slow queries
return 'healthy'
}
Error Handling
Custom Error Types
The DatabaseService defines specific error types for different failure scenarios:
export enum DatabaseErrorCodes {
CONNECTION_FAILED = 'DATABASE_CONNECTION_FAILED',
QUERY_FAILED = 'DATABASE_QUERY_FAILED',
SCHEMA_DISCOVERY_FAILED = 'SCHEMA_DISCOVERY_FAILED',
POOL_EXHAUSTED = 'CONNECTION_POOL_EXHAUSTED',
TIMEOUT = 'DATABASE_TIMEOUT',
AUTHENTICATION_FAILED = 'DATABASE_AUTH_FAILED',
SSL_ERROR = 'DATABASE_SSL_ERROR',
}
class DatabaseError extends AppError {
constructor(
code: DatabaseErrorCodes,
message: string,
public databaseId?: string,
public originalError?: Error
) {
super(code, message, 500, {
databaseId,
originalError: originalError?.message,
});
}
}
Error Recovery Strategies
The service implements automatic retry and recovery mechanisms:
private async executeWithRetry<T>(
operation: () => Promise<T>,
maxRetries: number = 3,
backoffMs: number = 1000
): Promise<T> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation()
} catch (error) {
if (attempt === maxRetries) {
throw error
}
// Check if error is retryable
if (!this.isRetryableError(error)) {
throw error
}
// Exponential backoff
const delay = backoffMs * Math.pow(2, attempt - 1)
await this.sleep(delay)
this.logger.warn(`Database operation retry ${attempt}/${maxRetries}`, {
error: error.message,
nextAttemptIn: delay
})
}
}
}
private isRetryableError(error: Error): boolean {
const retryableErrors = [
'ECONNRESET',
'ETIMEDOUT',
'ENOTFOUND',
'ER_LOCK_WAIT_TIMEOUT',
'ER_LOCK_DEADLOCK'
]
return retryableErrors.some(code =>
error.message.includes(code) || error.name === code
)
}
Performance Optimization
Query Optimization
The service includes query optimization features:
interface QueryOptimization {
enableQueryCache: boolean
optimizeFullTextQueries: boolean
useQueryPreparedStatements: boolean
enableQueryLogging: boolean
}
private optimizeQuery(sql: string, params: any[]): { sql: string; params: any[] } {
// Remove unnecessary whitespace
sql = sql.replace(/\s+/g, ' ').trim()
// Add query hints for full-text searches
if (sql.includes('MATCH') && sql.includes('AGAINST')) {
sql = sql.replace('SELECT', 'SELECT /*+ USE_INDEX(idx_fulltext) */')
}
// Optimize LIMIT clauses
if (sql.includes('LIMIT')) {
sql = sql.replace(/LIMIT (\d+) OFFSET (\d+)/, 'LIMIT $2, $1')
}
return { sql, params }
}
Connection Pool Optimization
Advanced connection pool management:
interface PoolOptimization {
connectionLimit: number
acquireTimeout: number
createTimeout: number
idleTimeout: number
reapInterval: number
}
private optimizeConnectionPool(databaseId: string, usage: PoolUsage): void {
const pool = this.connectionPools.get(databaseId)
if (!pool) return
const optimization: PoolOptimization = {
connectionLimit: Math.min(usage.peakConnections * 1.5, 50),
acquireTimeout: usage.averageResponseTime * 3,
createTimeout: 30000,
idleTimeout: 300000,
reapInterval: 30000
}
// Apply optimizations
pool.config.connectionLimit = optimization.connectionLimit
pool.config.acquireTimeout = optimization.acquireTimeout
this.logger.info('Connection pool optimized', {
databaseId,
optimization
})
}
Testing and Validation
Unit Testing
describe('DatabaseService', () => {
let databaseService: DatabaseService;
let mockEncryptionService: jest.Mocked<EncryptionService>;
let mockLogger: jest.Mocked<Logger>;
beforeEach(() => {
mockEncryptionService = {
encrypt: jest.fn(),
decrypt: jest.fn(),
};
mockLogger = {
info: jest.fn(),
error: jest.fn(),
debug: jest.fn(),
warn: jest.fn(),
};
databaseService = new DatabaseService(mockEncryptionService, mockLogger);
});
describe('addDatabase', () => {
it('should successfully add a database with valid configuration', async () => {
const config: DatabaseConfig = {
name: 'Test DB',
host: 'localhost',
port: 3306,
database: 'testdb',
username: 'testuser',
password: 'testpass',
};
mockEncryptionService.encrypt.mockResolvedValue('encrypted_password');
const result = await databaseService.addDatabase('user123', config);
expect(result).toHaveProperty('id');
expect(result.name).toBe(config.name);
expect(mockEncryptionService.encrypt).toHaveBeenCalledWith(
config.password
);
});
it('should throw error for invalid database configuration', async () => {
const config: DatabaseConfig = {
name: '',
host: 'localhost',
port: 3306,
database: 'testdb',
username: 'testuser',
password: 'testpass',
};
await expect(
databaseService.addDatabase('user123', config)
).rejects.toThrow('VALIDATION_ERROR');
});
});
describe('executeSearch', () => {
it('should execute search query and return results', async () => {
const searchQuery: SearchQuery = {
query: 'test search',
tables: ['articles'],
columns: ['title', 'content'],
limit: 10,
offset: 0,
mode: 'natural',
};
// Mock database connection and query execution
const mockResults = [
{
id: 1,
title: 'Test Article',
content: 'Test content',
relevance_score: 0.8,
},
];
jest
.spyOn(databaseService as any, 'executeQueryWithTimeout')
.mockResolvedValue(mockResults);
const results = await databaseService.executeSearch('db123', searchQuery);
expect(results).toHaveLength(1);
expect(results[0]).toHaveProperty('relevance_score');
});
});
});
Integration Testing
describe('DatabaseService Integration', () => {
let databaseService: DatabaseService;
let testDatabase: Database;
beforeAll(async () => {
// Setup test database
testDatabase = await setupTestDatabase();
databaseService = new DatabaseService(new EncryptionService(), logger);
});
afterAll(async () => {
await teardownTestDatabase(testDatabase);
});
it('should perform end-to-end database operations', async () => {
// Add database
const database = await databaseService.addDatabase(
'testuser',
testDbConfig
);
expect(database.id).toBeDefined();
// Test connection
const connectionTest = await databaseService.testConnection(testDbConfig);
expect(connectionTest.status).toBe('success');
// Discover schema
const schema = await databaseService.discoverSchema(database.id);
expect(schema.tables.length).toBeGreaterThan(0);
// Execute search
const searchResults = await databaseService.executeSearch(database.id, {
query: 'test',
tables: ['test_table'],
columns: ['content'],
limit: 10,
offset: 0,
mode: 'natural',
});
expect(searchResults).toBeDefined();
});
});
Best Practices
Security Best Practices
- Credential Encryption: Always encrypt database passwords at rest
- Connection Security: Use SSL/TLS connections in production
- SQL Injection Prevention: Use parameterized queries exclusively
- Access Control: Validate user permissions before database operations
- Audit Logging: Log all database operations for security auditing
Performance Best Practices
- Connection Pooling: Use appropriate pool sizes based on database capacity
- Query Optimization: Analyze and optimize slow queries regularly
- Index Strategy: Ensure proper indexes for full-text search
- Resource Monitoring: Monitor database performance and connection health
- Caching: Implement query result caching where appropriate
Operational Best Practices
- Health Monitoring: Regular health checks and alerting
- Error Recovery: Implement retry logic for transient failures
- Resource Cleanup: Proper connection and resource cleanup
- Configuration Validation: Validate all database configurations
- Documentation: Maintain clear documentation of database schemas
The DatabaseService provides a robust foundation for MySQL database management in Altus 4, handling everything from secure connections to optimized query execution.