Draft Documentation
This page is currently under development. Content may be incomplete or subject to change.
Makes extensive use of AI-generated documentation.
Custom Automation#
SysGit's Python object model enables custom automation scripts tailored to your organization's specific workflows, tools, and processes. This section covers building custom integrations, batch operations, validation rules, and advanced automation patterns.
Python Script Template#
All custom automation scripts should follow this general structure:
import sysgit as sg
import argparse
import logging
import sys
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description='Custom SysGit automation script'
)
parser.add_argument(
'sysml_file',
help='Path to the SysML file'
)
parser.add_argument(
'-o', '--output',
help='Output file path (default: overwrite input)',
default=None
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
help='Enable verbose logging'
)
parser.add_argument(
'-s', '--silent',
action='store_true',
help='Silent mode (only errors)'
)
return parser.parse_args()
def main():
args = parse_args()
# Configure logging level
if args.silent:
logger.setLevel(logging.ERROR)
elif args.verbose:
logger.setLevel(logging.DEBUG)
try:
# Read model
logger.info(f"Reading: {args.sysml_file}")
model = sg.read(args.sysml_file)
# YOUR CUSTOM LOGIC HERE
# Write output
output_file = args.output if args.output else args.sysml_file
logger.info(f"Writing: {output_file}")
model.write(output_file, overwrite=True)
logger.info("Done!")
return 0
except Exception as e:
logger.error(f"Error: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())
Webhooks and APIs#
Git Provider Webhooks#
GitLab Webhooks#
SysGit leverages GitLab's built-in webhook system through .gitlab-ci.yml configuration:
# Trigger on specific events
workflow:
rules:
# Run on push to main or release branches
- if: '$CI_COMMIT_BRANCH == "main"'
- if: '$CI_COMMIT_BRANCH =~ /^release\/.*/'
# Run on merge requests
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
# Run on tags
- if: '$CI_COMMIT_TAG'
GitHub Webhooks#
For GitHub Actions integration:
name: SysGit Automation
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
schedule:
- cron: '0 0 * * 0' # Weekly
jobs:
update-model:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: pip install git+<SYSGIT_REPO_URL>
- name: Run automation
run: python automations/update_model.py sysgit/model.sysml
Event Triggers#
Commit-Based Triggers#
Trigger different automation based on commit message patterns:
stages:
- validate
- update
- report
validate-model:
stage: validate
rules:
- if: '$CI_COMMIT_MESSAGE =~ /\[validate\]/'
script:
- python3 automations/validate_model.py sysgit/model.sysml
update-verifications:
stage: update
rules:
- if: '$CI_COMMIT_MESSAGE =~ /\[update-verifications\]/'
script:
- python3 automations/update_verifications.py sysgit/model.sysml
generate-report:
stage: report
rules:
- if: '$CI_COMMIT_MESSAGE =~ /\[report\]/'
script:
- python3 automations/generate_report.py sysgit/model.sysml
artifacts:
paths:
- reports/
Scheduled Triggers#
Run automation on a schedule rather than on every commit:
# GitLab scheduled pipeline
scheduled-checks:
rules:
- if: '$CI_PIPELINE_SOURCE == "schedule"'
script:
- python3 automations/weekly_validation.py sysgit/model.sysml
- python3 automations/generate_metrics.py sysgit/model.sysml
artifacts:
paths:
- metrics/
expire_in: 30 days
Configure schedules in GitLab: CI/CD → Schedules → New schedule
Custom Integrations#
REST API Integration#
Integrate with external REST APIs:
import sysgit as sg
import requests
import json
def fetch_external_data(api_url, api_key):
"""Fetch data from external REST API."""
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
return response.json()
def update_from_api(model, api_data):
"""Update model based on API data."""
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
for req in requirements:
req_id = req.get('short_name')
if req_id in api_data:
# Update requirement attributes from API
if 'priority' in api_data[req_id]:
req.metadata['priority'] = api_data[req_id]['priority']
if 'status' in api_data[req_id]:
req.metadata['status'] = api_data[req_id]['status']
# Main script
model = sg.read("requirements.sysml")
api_data = fetch_external_data(
"https://api.example.com/requirements",
api_key="your_api_key"
)
update_from_api(model, api_data)
model.write("requirements.sysml", overwrite=True)
Database Integration#
Connect to enterprise databases:
import sysgit as sg
import psycopg2
from psycopg2.extras import RealDictCursor
def fetch_from_database(db_config):
"""Fetch requirements from PostgreSQL database."""
conn = psycopg2.connect(**db_config)
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("""
SELECT req_id, description, priority, status
FROM requirements
WHERE active = true
""")
results = cursor.fetchall()
cursor.close()
conn.close()
return {row['req_id']: dict(row) for row in results}
def sync_with_database(model, db_data):
"""Synchronize model with database records."""
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
for req in requirements:
req_id = req.get('short_name')
if req_id in db_data:
db_req = db_data[req_id]
# Update text if changed
if req.text != db_req['description']:
logger.info(f"Updating description for {req_id}")
req.text = db_req['description']
# Update metadata
req.metadata['priority'] = db_req['priority']
req.metadata['status'] = db_req['status']
# Main script
model = sg.read("requirements.sysml")
db_config = {
'host': 'db.example.com',
'database': 'requirements_db',
'user': 'readonly_user',
'password': 'secure_password'
}
db_data = fetch_from_database(db_config)
sync_with_database(model, db_data)
model.write("requirements.sysml", overwrite=True)
MQTT/Message Queue Integration#
For real-time event-driven automation:
import sysgit as sg
import paho.mqtt.client as mqtt
import json
class ModelUpdater:
def __init__(self, sysml_file):
self.sysml_file = sysml_file
self.model = sg.read(sysml_file)
def on_message(self, client, userdata, message):
"""Handle incoming MQTT messages."""
try:
payload = json.loads(message.payload)
if message.topic == "sysgit/verification/update":
self.update_verification(payload)
elif message.topic == "sysgit/requirement/update":
self.update_requirement(payload)
# Save changes
self.model.write(self.sysml_file, overwrite=True)
logger.info(f"Updated model from topic: {message.topic}")
except Exception as e:
logger.error(f"Error processing message: {e}")
def update_verification(self, data):
"""Update verification verdict from message."""
verifications = self.model.select(sg.VerificationCaseUsage)
for v in verifications:
if v.qualified_name.endswith(f"'{data['name']}'"):
v["verdict"].value = data['verdict']
def update_requirement(self, data):
"""Update requirement from message."""
requirements = self.model.select(sg.RequirementUsage, exclude_anon=True)
for req in requirements:
if req.get('short_name') == data['id']:
req.metadata['status'] = data['status']
# Set up MQTT client
updater = ModelUpdater("requirements.sysml")
client = mqtt.Client()
client.on_message = updater.on_message
client.connect("mqtt.example.com", 1883)
client.subscribe("sysgit/#")
client.loop_forever()
Scripting#
Batch Operations#
Bulk Metadata Updates#
Update metadata across multiple requirements:
import sysgit as sg
def bulk_update_metadata(model, updates):
"""Apply metadata updates to multiple requirements.
Args:
model: SysGit model object
updates: Dict mapping requirement IDs to metadata updates
"""
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
updated_count = 0
for req in requirements:
req_id = req.get('short_name')
if req_id in updates:
for key, value in updates[req_id].items():
req.metadata[key] = value
logger.debug(f"Updated {req_id}.{key} = {value}")
updated_count += 1
logger.info(f"Updated {updated_count} requirements")
return updated_count
# Example usage
model = sg.read("requirements.sysml")
metadata_updates = {
'REQ-001': {'priority': 'high', 'owner': 'John Doe'},
'REQ-002': {'priority': 'medium', 'owner': 'Jane Smith'},
'REQ-003': {'priority': 'low', 'owner': 'Bob Johnson'},
}
bulk_update_metadata(model, metadata_updates)
model.write("requirements.sysml", overwrite=True)
Mass Verification Creation#
Automatically create verification cases for requirements:
import sysgit as sg
def create_verifications_for_requirements(model):
"""Create verification cases for requirements lacking them.
This is a simplified example - actual implementation would need
to generate proper SysML v2 syntax.
"""
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
verifications = model.select(sg.VerificationCaseUsage)
# Build set of verified requirements
verified = set()
for v in verifications:
# Extract verified requirements from verify statements
# (implementation depends on model structure)
pass
# Identify requirements without verification
unverified = [r for r in requirements if r.qualified_name not in verified]
logger.info(f"Found {len(unverified)} unverified requirements")
# Create verification cases
for req in unverified:
req_name = req.qualified_name.split("::")[-1]
verification_name = f"Verify {req_name}"
# Note: Actual creation would require building proper SysML v2 syntax
logger.info(f"Would create verification: {verification_name}")
return len(unverified)
model = sg.read("requirements.sysml")
count = create_verifications_for_requirements(model)
logger.info(f"Created {count} new verifications")
Batch Status Updates#
Update status across multiple elements based on external data:
import sysgit as sg
import pandas as pd
def bulk_status_update(model, status_file):
"""Update element statuses from CSV file."""
# Read status updates from CSV
status_df = pd.read_csv(status_file)
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
updated = 0
for req in requirements:
req_id = req.get('short_name')
# Find matching row in CSV
matches = status_df[status_df['req_id'] == req_id]
if not matches.empty:
new_status = matches.iloc[0]['status']
req.metadata['status'] = new_status
updated += 1
logger.debug(f"Updated {req_id} status to {new_status}")
logger.info(f"Updated status for {updated} requirements")
return updated
model = sg.read("requirements.sysml")
bulk_status_update(model, "status_updates.csv")
model.write("requirements.sysml", overwrite=True)
Custom Validation Rules#
Requirement Text Quality Checks#
Enforce text quality standards:
import sysgit as sg
import re
def validate_requirement_text(req):
"""Validate requirement text quality."""
issues = []
text = req.text if hasattr(req, 'text') else ""
# Check minimum length
if len(text) < 20:
issues.append("Text too short (< 20 characters)")
# Check for passive voice
passive_patterns = [
r'\bwill be\b', r'\bshall be\b', r'\bis \w+ed\b'
]
for pattern in passive_patterns:
if re.search(pattern, text, re.IGNORECASE):
issues.append(f"Contains passive voice: {pattern}")
# Check for ambiguous terms
ambiguous_terms = ['appropriate', 'adequate', 'sufficient', 'reasonable']
for term in ambiguous_terms:
if term in text.lower():
issues.append(f"Contains ambiguous term: {term}")
# Check for quantifiable criteria
if not re.search(r'\d+', text):
issues.append("No quantifiable criteria found")
return issues
def validate_all_requirements(model):
"""Run validation checks on all requirements."""
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
validation_report = {}
for req in requirements:
issues = validate_requirement_text(req)
if issues:
validation_report[req.qualified_name] = issues
return validation_report
# Run validation
model = sg.read("requirements.sysml")
report = validate_all_requirements(model)
if report:
logger.warning(f"Validation found {len(report)} requirements with issues:")
for req_name, issues in report.items():
logger.warning(f" {req_name}:")
for issue in issues:
logger.warning(f" - {issue}")
else:
logger.info("All requirements passed validation")
Naming Convention Enforcement#
Enforce organizational naming standards:
import sysgit as sg
import re
def validate_naming_conventions(model):
"""Enforce naming conventions for requirements."""
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
violations = []
for req in requirements:
short_name = req.get('short_name')
declared_name = req.get('declared_name')
# Check short name format (e.g., REQ-XXX-YYY)
if short_name and not re.match(r'^REQ-\d{3}-\d{3}$', short_name):
violations.append({
'requirement': req.qualified_name,
'issue': f"Invalid short name format: {short_name}",
'expected': "REQ-XXX-YYY"
})
# Check declared name conventions
if declared_name:
# Should not contain special characters
if re.search(r'[^a-zA-Z0-9\s\-]', declared_name):
violations.append({
'requirement': req.qualified_name,
'issue': f"Declared name contains special characters",
'name': declared_name
})
# Should be title case
if declared_name != declared_name.title():
violations.append({
'requirement': req.qualified_name,
'issue': "Declared name not in title case",
'name': declared_name
})
return violations
model = sg.read("requirements.sysml")
violations = validate_naming_conventions(model)
if violations:
logger.error(f"Found {len(violations)} naming violations:")
for v in violations:
logger.error(f" {v['requirement']}: {v['issue']}")
sys.exit(1) # Fail the pipeline
else:
logger.info("All naming conventions satisfied")
Traceability Validation#
Ensure complete traceability:
import sysgit as sg
def validate_traceability(model):
"""Check that all requirements have verifications."""
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
verifications = model.select(sg.VerificationCaseUsage)
# Build set of verified requirements
verified_reqs = set()
for v in verifications:
# Extract requirements from verify statements
# This is simplified - actual implementation depends on model structure
if hasattr(v, 'objective'):
obj = v['objective']
if hasattr(obj, 'verify'):
for verify_stmt in obj.verify:
verified_reqs.add(verify_stmt.target)
# Find unverified requirements
unverified = []
for req in requirements:
if req.qualified_name not in verified_reqs:
unverified.append(req.qualified_name)
# Calculate coverage
total = len(requirements)
verified = total - len(unverified)
coverage = (verified / total * 100) if total > 0 else 0
logger.info(f"Verification coverage: {coverage:.1f}% ({verified}/{total})")
if unverified:
logger.warning(f"Unverified requirements ({len(unverified)}):")
for req_name in unverified:
logger.warning(f" - {req_name}")
return coverage, unverified
model = sg.read("requirements.sysml")
coverage, unverified = validate_traceability(model)
# Fail pipeline if coverage below threshold
COVERAGE_THRESHOLD = 95.0
if coverage < COVERAGE_THRESHOLD:
logger.error(f"Verification coverage {coverage:.1f}% below threshold {COVERAGE_THRESHOLD}%")
sys.exit(1)
Model Consistency Checks#
Validate internal model consistency:
import sysgit as sg
def check_model_consistency(model):
"""Run comprehensive consistency checks."""
issues = []
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
verifications = model.select(sg.VerificationCaseUsage)
# Check for duplicate short names
short_names = {}
for req in requirements:
short_name = req.get('short_name')
if short_name:
if short_name in short_names:
issues.append({
'type': 'duplicate_id',
'id': short_name,
'locations': [short_names[short_name], req.qualified_name]
})
else:
short_names[short_name] = req.qualified_name
# Check for orphaned references
all_req_names = {req.qualified_name for req in requirements}
for v in verifications:
# Check if verified requirements exist
# (implementation depends on model structure)
pass
# Check for missing metadata
for req in requirements:
if not hasattr(req, 'metadata'):
issues.append({
'type': 'missing_metadata',
'requirement': req.qualified_name
})
return issues
model = sg.read("requirements.sysml")
issues = check_model_consistency(model)
if issues:
logger.error(f"Found {len(issues)} consistency issues:")
for issue in issues:
logger.error(f" {issue['type']}: {issue}")
sys.exit(1)
else:
logger.info("Model consistency checks passed")
Advanced Patterns#
Multi-File Processing#
Process multiple SysML files in a repository:
import sysgit as sg
import glob
import os
def process_all_sysml_files(directory, operation):
"""Apply operation to all .sysml files in directory."""
sysml_files = glob.glob(os.path.join(directory, "**/*.sysml"), recursive=True)
logger.info(f"Found {len(sysml_files)} SysML files")
results = []
for filepath in sysml_files:
try:
logger.info(f"Processing: {filepath}")
model = sg.read(filepath)
result = operation(model, filepath)
model.write(filepath, overwrite=True)
results.append({'file': filepath, 'result': result, 'status': 'success'})
except Exception as e:
logger.error(f"Error processing {filepath}: {e}")
results.append({'file': filepath, 'error': str(e), 'status': 'failed'})
return results
def update_all_verifications(model, filepath):
"""Example operation: update verifications."""
verifications = model.select(sg.VerificationCaseUsage)
# Apply updates...
return len(verifications)
# Process all files
results = process_all_sysml_files("sysgit/", update_all_verifications)
# Summary
success = sum(1 for r in results if r['status'] == 'success')
failed = sum(1 for r in results if r['status'] == 'failed')
logger.info(f"Processed {success} files successfully, {failed} failed")
Conditional Updates#
Only update model if specific conditions are met:
import sysgit as sg
from datetime import datetime, timedelta
def conditional_update(model, conditions):
"""Only update if all conditions are satisfied."""
# Check conditions
if not all_conditions_met(model, conditions):
logger.info("Conditions not met, skipping update")
return False
# Proceed with update
logger.info("Conditions satisfied, proceeding with update")
# ... perform updates ...
return True
def all_conditions_met(model, conditions):
"""Check if all conditions are satisfied."""
if 'min_coverage' in conditions:
coverage = calculate_coverage(model)
if coverage < conditions['min_coverage']:
logger.warning(f"Coverage {coverage}% below minimum {conditions['min_coverage']}%")
return False
if 'max_age_days' in conditions:
# Check model freshness based on metadata
# (implementation depends on your metadata structure)
pass
return True
# Usage
model = sg.read("requirements.sysml")
conditions = {
'min_coverage': 90.0,
'max_age_days': 30
}
if conditional_update(model, conditions):
model.write("requirements.sysml", overwrite=True)
Incremental Processing#
Track what's been processed to avoid redundant work:
import sysgit as sg
import json
import hashlib
def get_model_hash(model):
"""Calculate hash of model content."""
return hashlib.sha256(model.code.encode()).hexdigest()
def load_processing_state(state_file):
"""Load previous processing state."""
try:
with open(state_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {}
def save_processing_state(state_file, state):
"""Save processing state."""
with open(state_file, 'w') as f:
json.dump(state, f, indent=2)
def incremental_process(sysml_file, state_file, operation):
"""Only process if model has changed."""
model = sg.read(sysml_file)
current_hash = get_model_hash(model)
state = load_processing_state(state_file)
last_hash = state.get('last_hash')
if current_hash == last_hash:
logger.info("Model unchanged since last processing, skipping")
return False
logger.info("Model changed, processing")
result = operation(model)
# Update state
state['last_hash'] = current_hash
state['last_processed'] = datetime.now().isoformat()
state['result'] = result
save_processing_state(state_file, state)
model.write(sysml_file, overwrite=True)
return True
# Usage
incremental_process(
"requirements.sysml",
".processing_state.json",
lambda m: update_verifications(m)
)
Integration with External Tools#
DOORS Integration#
Export/import between IBM DOORS and SysGit:
import sysgit as sg
import csv
def export_to_doors_csv(model, output_file):
"""Export requirements to DOORS-compatible CSV."""
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Object ID', 'Object Text', 'Priority', 'Status'])
for req in requirements:
writer.writerow([
req.get('short_name', ''),
req.text if hasattr(req, 'text') else '',
req.metadata.get('priority', ''),
req.metadata.get('status', '')
])
logger.info(f"Exported {len(requirements)} requirements to {output_file}")
def import_from_doors_csv(model, input_file):
"""Import requirement updates from DOORS CSV."""
updates = {}
with open(input_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
updates[row['Object ID']] = {
'text': row['Object Text'],
'priority': row['Priority'],
'status': row['Status']
}
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
updated_count = 0
for req in requirements:
req_id = req.get('short_name')
if req_id in updates:
req.text = updates[req_id]['text']
req.metadata['priority'] = updates[req_id]['priority']
req.metadata['status'] = updates[req_id]['status']
updated_count += 1
logger.info(f"Updated {updated_count} requirements from DOORS")
return updated_count
# Usage
model = sg.read("requirements.sysml")
export_to_doors_csv(model, "requirements_export.csv")
# ... edit in DOORS ...
import_from_doors_csv(model, "requirements_updated.csv")
model.write("requirements.sysml", overwrite=True)
Jira Integration#
Sync requirements with Jira issues:
import sysgit as sg
import requests
def sync_with_jira(model, jira_config):
"""Synchronize requirements with Jira issues."""
# Fetch issues from Jira
response = requests.get(
f"{jira_config['url']}/rest/api/2/search",
params={'jql': jira_config['jql']},
auth=(jira_config['username'], jira_config['api_token'])
)
jira_issues = response.json()['issues']
requirements = model.select(sg.RequirementUsage, exclude_anon=True)
for req in requirements:
# Find matching Jira issue
jira_key = req.metadata.get('jira_key')
if jira_key:
matching_issues = [i for i in jira_issues if i['key'] == jira_key]
if matching_issues:
issue = matching_issues[0]
# Update requirement from Jira
req.metadata['status'] = issue['fields']['status']['name']
req.metadata['assignee'] = issue['fields']['assignee']['displayName']
logger.debug(f"Updated {req.qualified_name} from Jira {jira_key}")
return len([r for r in requirements if r.metadata.get('jira_key')])
# Usage
model = sg.read("requirements.sysml")
jira_config = {
'url': 'https://your-domain.atlassian.net',
'username': 'user@example.com',
'api_token': 'your_api_token',
'jql': 'project = SYSENG AND type = Requirement'
}
updated = sync_with_jira(model, jira_config)
logger.info(f"Synchronized {updated} requirements with Jira")
model.write("requirements.sysml", overwrite=True)
See Also#
- CI/CD Overview - General CI/CD concepts and architecture
- Automated Verification - Test system integration
- SysML v2 Language Reference - Language syntax