Skip to content

Draft Documentation

This page is currently under development. Content may be incomplete or subject to change.
Makes extensive use of AI-generated documentation.

Custom Automation#

SysGit's Python object model enables custom automation scripts tailored to your organization's specific workflows, tools, and processes. This section covers building custom integrations, batch operations, validation rules, and advanced automation patterns.

Python Script Template#

All custom automation scripts should follow this general structure:

import sysgit as sg
import argparse
import logging
import sys

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def parse_args():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(
        description='Custom SysGit automation script'
    )
    parser.add_argument(
        'sysml_file',
        help='Path to the SysML file'
    )
    parser.add_argument(
        '-o', '--output',
        help='Output file path (default: overwrite input)',
        default=None
    )
    parser.add_argument(
        '-v', '--verbose',
        action='store_true',
        help='Enable verbose logging'
    )
    parser.add_argument(
        '-s', '--silent',
        action='store_true',
        help='Silent mode (only errors)'
    )
    return parser.parse_args()

def main():
    args = parse_args()

    # Configure logging level
    if args.silent:
        logger.setLevel(logging.ERROR)
    elif args.verbose:
        logger.setLevel(logging.DEBUG)

    try:
        # Read model
        logger.info(f"Reading: {args.sysml_file}")
        model = sg.read(args.sysml_file)

        # YOUR CUSTOM LOGIC HERE

        # Write output
        output_file = args.output if args.output else args.sysml_file
        logger.info(f"Writing: {output_file}")
        model.write(output_file, overwrite=True)

        logger.info("Done!")
        return 0

    except Exception as e:
        logger.error(f"Error: {e}")
        return 1

if __name__ == "__main__":
    sys.exit(main())

Webhooks and APIs#

Git Provider Webhooks#

GitLab Webhooks#

SysGit leverages GitLab's built-in webhook system through .gitlab-ci.yml configuration:

# Trigger on specific events
workflow:
  rules:
    # Run on push to main or release branches
    - if: '$CI_COMMIT_BRANCH == "main"'
    - if: '$CI_COMMIT_BRANCH =~ /^release\/.*/'
    # Run on merge requests
    - if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
    # Run on tags
    - if: '$CI_COMMIT_TAG'

GitHub Webhooks#

For GitHub Actions integration:

name: SysGit Automation

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]
  schedule:
    - cron: '0 0 * * 0'  # Weekly

jobs:
  update-model:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: '3.9'
      - name: Install dependencies
        run: pip install git+<SYSGIT_REPO_URL>
      - name: Run automation
        run: python automations/update_model.py sysgit/model.sysml

Event Triggers#

Commit-Based Triggers#

Trigger different automation based on commit message patterns:

stages:
  - validate
  - update
  - report

validate-model:
  stage: validate
  rules:
    - if: '$CI_COMMIT_MESSAGE =~ /\[validate\]/'
  script:
    - python3 automations/validate_model.py sysgit/model.sysml

update-verifications:
  stage: update
  rules:
    - if: '$CI_COMMIT_MESSAGE =~ /\[update-verifications\]/'
  script:
    - python3 automations/update_verifications.py sysgit/model.sysml

generate-report:
  stage: report
  rules:
    - if: '$CI_COMMIT_MESSAGE =~ /\[report\]/'
  script:
    - python3 automations/generate_report.py sysgit/model.sysml
  artifacts:
    paths:
      - reports/

Scheduled Triggers#

Run automation on a schedule rather than on every commit:

# GitLab scheduled pipeline
scheduled-checks:
  rules:
    - if: '$CI_PIPELINE_SOURCE == "schedule"'
  script:
    - python3 automations/weekly_validation.py sysgit/model.sysml
    - python3 automations/generate_metrics.py sysgit/model.sysml
  artifacts:
    paths:
      - metrics/
    expire_in: 30 days

Configure schedules in GitLab: CI/CD → Schedules → New schedule

Custom Integrations#

REST API Integration#

Integrate with external REST APIs:

import sysgit as sg
import requests
import json

def fetch_external_data(api_url, api_key):
    """Fetch data from external REST API."""
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Content-Type': 'application/json'
    }

    response = requests.get(api_url, headers=headers)
    response.raise_for_status()
    return response.json()

def update_from_api(model, api_data):
    """Update model based on API data."""
    requirements = model.select(sg.RequirementUsage, exclude_anon=True)

    for req in requirements:
        req_id = req.get('short_name')
        if req_id in api_data:
            # Update requirement attributes from API
            if 'priority' in api_data[req_id]:
                req.metadata['priority'] = api_data[req_id]['priority']
            if 'status' in api_data[req_id]:
                req.metadata['status'] = api_data[req_id]['status']

# Main script
model = sg.read("requirements.sysml")
api_data = fetch_external_data(
    "https://api.example.com/requirements",
    api_key="your_api_key"
)
update_from_api(model, api_data)
model.write("requirements.sysml", overwrite=True)

Database Integration#

Connect to enterprise databases:

import sysgit as sg
import psycopg2
from psycopg2.extras import RealDictCursor

def fetch_from_database(db_config):
    """Fetch requirements from PostgreSQL database."""
    conn = psycopg2.connect(**db_config)
    cursor = conn.cursor(cursor_factory=RealDictCursor)

    cursor.execute("""
        SELECT req_id, description, priority, status
        FROM requirements
        WHERE active = true
    """)

    results = cursor.fetchall()
    cursor.close()
    conn.close()

    return {row['req_id']: dict(row) for row in results}

def sync_with_database(model, db_data):
    """Synchronize model with database records."""
    requirements = model.select(sg.RequirementUsage, exclude_anon=True)

    for req in requirements:
        req_id = req.get('short_name')
        if req_id in db_data:
            db_req = db_data[req_id]

            # Update text if changed
            if req.text != db_req['description']:
                logger.info(f"Updating description for {req_id}")
                req.text = db_req['description']

            # Update metadata
            req.metadata['priority'] = db_req['priority']
            req.metadata['status'] = db_req['status']

# Main script
model = sg.read("requirements.sysml")
db_config = {
    'host': 'db.example.com',
    'database': 'requirements_db',
    'user': 'readonly_user',
    'password': 'secure_password'
}
db_data = fetch_from_database(db_config)
sync_with_database(model, db_data)
model.write("requirements.sysml", overwrite=True)

MQTT/Message Queue Integration#

For real-time event-driven automation:

import sysgit as sg
import paho.mqtt.client as mqtt
import json

class ModelUpdater:
    def __init__(self, sysml_file):
        self.sysml_file = sysml_file
        self.model = sg.read(sysml_file)

    def on_message(self, client, userdata, message):
        """Handle incoming MQTT messages."""
        try:
            payload = json.loads(message.payload)

            if message.topic == "sysgit/verification/update":
                self.update_verification(payload)
            elif message.topic == "sysgit/requirement/update":
                self.update_requirement(payload)

            # Save changes
            self.model.write(self.sysml_file, overwrite=True)
            logger.info(f"Updated model from topic: {message.topic}")

        except Exception as e:
            logger.error(f"Error processing message: {e}")

    def update_verification(self, data):
        """Update verification verdict from message."""
        verifications = self.model.select(sg.VerificationCaseUsage)
        for v in verifications:
            if v.qualified_name.endswith(f"'{data['name']}'"):
                v["verdict"].value = data['verdict']

    def update_requirement(self, data):
        """Update requirement from message."""
        requirements = self.model.select(sg.RequirementUsage, exclude_anon=True)
        for req in requirements:
            if req.get('short_name') == data['id']:
                req.metadata['status'] = data['status']

# Set up MQTT client
updater = ModelUpdater("requirements.sysml")
client = mqtt.Client()
client.on_message = updater.on_message
client.connect("mqtt.example.com", 1883)
client.subscribe("sysgit/#")
client.loop_forever()

Scripting#

Batch Operations#

Bulk Metadata Updates#

Update metadata across multiple requirements:

import sysgit as sg

def bulk_update_metadata(model, updates):
    """Apply metadata updates to multiple requirements.

    Args:
        model: SysGit model object
        updates: Dict mapping requirement IDs to metadata updates
    """
    requirements = model.select(sg.RequirementUsage, exclude_anon=True)
    updated_count = 0

    for req in requirements:
        req_id = req.get('short_name')
        if req_id in updates:
            for key, value in updates[req_id].items():
                req.metadata[key] = value
                logger.debug(f"Updated {req_id}.{key} = {value}")
            updated_count += 1

    logger.info(f"Updated {updated_count} requirements")
    return updated_count

# Example usage
model = sg.read("requirements.sysml")

metadata_updates = {
    'REQ-001': {'priority': 'high', 'owner': 'John Doe'},
    'REQ-002': {'priority': 'medium', 'owner': 'Jane Smith'},
    'REQ-003': {'priority': 'low', 'owner': 'Bob Johnson'},
}

bulk_update_metadata(model, metadata_updates)
model.write("requirements.sysml", overwrite=True)

Mass Verification Creation#

Automatically create verification cases for requirements:

import sysgit as sg

def create_verifications_for_requirements(model):
    """Create verification cases for requirements lacking them.

    This is a simplified example - actual implementation would need
    to generate proper SysML v2 syntax.
    """
    requirements = model.select(sg.RequirementUsage, exclude_anon=True)
    verifications = model.select(sg.VerificationCaseUsage)

    # Build set of verified requirements
    verified = set()
    for v in verifications:
        # Extract verified requirements from verify statements
        # (implementation depends on model structure)
        pass

    # Identify requirements without verification
    unverified = [r for r in requirements if r.qualified_name not in verified]

    logger.info(f"Found {len(unverified)} unverified requirements")

    # Create verification cases
    for req in unverified:
        req_name = req.qualified_name.split("::")[-1]
        verification_name = f"Verify {req_name}"

        # Note: Actual creation would require building proper SysML v2 syntax
        logger.info(f"Would create verification: {verification_name}")

    return len(unverified)

model = sg.read("requirements.sysml")
count = create_verifications_for_requirements(model)
logger.info(f"Created {count} new verifications")

Batch Status Updates#

Update status across multiple elements based on external data:

import sysgit as sg
import pandas as pd

def bulk_status_update(model, status_file):
    """Update element statuses from CSV file."""
    # Read status updates from CSV
    status_df = pd.read_csv(status_file)

    requirements = model.select(sg.RequirementUsage, exclude_anon=True)
    updated = 0

    for req in requirements:
        req_id = req.get('short_name')

        # Find matching row in CSV
        matches = status_df[status_df['req_id'] == req_id]
        if not matches.empty:
            new_status = matches.iloc[0]['status']
            req.metadata['status'] = new_status
            updated += 1
            logger.debug(f"Updated {req_id} status to {new_status}")

    logger.info(f"Updated status for {updated} requirements")
    return updated

model = sg.read("requirements.sysml")
bulk_status_update(model, "status_updates.csv")
model.write("requirements.sysml", overwrite=True)

Custom Validation Rules#

Requirement Text Quality Checks#

Enforce text quality standards:

import sysgit as sg
import re

def validate_requirement_text(req):
    """Validate requirement text quality."""
    issues = []

    text = req.text if hasattr(req, 'text') else ""

    # Check minimum length
    if len(text) < 20:
        issues.append("Text too short (< 20 characters)")

    # Check for passive voice
    passive_patterns = [
        r'\bwill be\b', r'\bshall be\b', r'\bis \w+ed\b'
    ]
    for pattern in passive_patterns:
        if re.search(pattern, text, re.IGNORECASE):
            issues.append(f"Contains passive voice: {pattern}")

    # Check for ambiguous terms
    ambiguous_terms = ['appropriate', 'adequate', 'sufficient', 'reasonable']
    for term in ambiguous_terms:
        if term in text.lower():
            issues.append(f"Contains ambiguous term: {term}")

    # Check for quantifiable criteria
    if not re.search(r'\d+', text):
        issues.append("No quantifiable criteria found")

    return issues

def validate_all_requirements(model):
    """Run validation checks on all requirements."""
    requirements = model.select(sg.RequirementUsage, exclude_anon=True)

    validation_report = {}
    for req in requirements:
        issues = validate_requirement_text(req)
        if issues:
            validation_report[req.qualified_name] = issues

    return validation_report

# Run validation
model = sg.read("requirements.sysml")
report = validate_all_requirements(model)

if report:
    logger.warning(f"Validation found {len(report)} requirements with issues:")
    for req_name, issues in report.items():
        logger.warning(f"  {req_name}:")
        for issue in issues:
            logger.warning(f"    - {issue}")
else:
    logger.info("All requirements passed validation")

Naming Convention Enforcement#

Enforce organizational naming standards:

import sysgit as sg
import re

def validate_naming_conventions(model):
    """Enforce naming conventions for requirements."""
    requirements = model.select(sg.RequirementUsage, exclude_anon=True)
    violations = []

    for req in requirements:
        short_name = req.get('short_name')
        declared_name = req.get('declared_name')

        # Check short name format (e.g., REQ-XXX-YYY)
        if short_name and not re.match(r'^REQ-\d{3}-\d{3}$', short_name):
            violations.append({
                'requirement': req.qualified_name,
                'issue': f"Invalid short name format: {short_name}",
                'expected': "REQ-XXX-YYY"
            })

        # Check declared name conventions
        if declared_name:
            # Should not contain special characters
            if re.search(r'[^a-zA-Z0-9\s\-]', declared_name):
                violations.append({
                    'requirement': req.qualified_name,
                    'issue': f"Declared name contains special characters",
                    'name': declared_name
                })

            # Should be title case
            if declared_name != declared_name.title():
                violations.append({
                    'requirement': req.qualified_name,
                    'issue': "Declared name not in title case",
                    'name': declared_name
                })

    return violations

model = sg.read("requirements.sysml")
violations = validate_naming_conventions(model)

if violations:
    logger.error(f"Found {len(violations)} naming violations:")
    for v in violations:
        logger.error(f"  {v['requirement']}: {v['issue']}")
    sys.exit(1)  # Fail the pipeline
else:
    logger.info("All naming conventions satisfied")

Traceability Validation#

Ensure complete traceability:

import sysgit as sg

def validate_traceability(model):
    """Check that all requirements have verifications."""
    requirements = model.select(sg.RequirementUsage, exclude_anon=True)
    verifications = model.select(sg.VerificationCaseUsage)

    # Build set of verified requirements
    verified_reqs = set()
    for v in verifications:
        # Extract requirements from verify statements
        # This is simplified - actual implementation depends on model structure
        if hasattr(v, 'objective'):
            obj = v['objective']
            if hasattr(obj, 'verify'):
                for verify_stmt in obj.verify:
                    verified_reqs.add(verify_stmt.target)

    # Find unverified requirements
    unverified = []
    for req in requirements:
        if req.qualified_name not in verified_reqs:
            unverified.append(req.qualified_name)

    # Calculate coverage
    total = len(requirements)
    verified = total - len(unverified)
    coverage = (verified / total * 100) if total > 0 else 0

    logger.info(f"Verification coverage: {coverage:.1f}% ({verified}/{total})")

    if unverified:
        logger.warning(f"Unverified requirements ({len(unverified)}):")
        for req_name in unverified:
            logger.warning(f"  - {req_name}")

    return coverage, unverified

model = sg.read("requirements.sysml")
coverage, unverified = validate_traceability(model)

# Fail pipeline if coverage below threshold
COVERAGE_THRESHOLD = 95.0
if coverage < COVERAGE_THRESHOLD:
    logger.error(f"Verification coverage {coverage:.1f}% below threshold {COVERAGE_THRESHOLD}%")
    sys.exit(1)

Model Consistency Checks#

Validate internal model consistency:

import sysgit as sg

def check_model_consistency(model):
    """Run comprehensive consistency checks."""
    issues = []

    requirements = model.select(sg.RequirementUsage, exclude_anon=True)
    verifications = model.select(sg.VerificationCaseUsage)

    # Check for duplicate short names
    short_names = {}
    for req in requirements:
        short_name = req.get('short_name')
        if short_name:
            if short_name in short_names:
                issues.append({
                    'type': 'duplicate_id',
                    'id': short_name,
                    'locations': [short_names[short_name], req.qualified_name]
                })
            else:
                short_names[short_name] = req.qualified_name

    # Check for orphaned references
    all_req_names = {req.qualified_name for req in requirements}
    for v in verifications:
        # Check if verified requirements exist
        # (implementation depends on model structure)
        pass

    # Check for missing metadata
    for req in requirements:
        if not hasattr(req, 'metadata'):
            issues.append({
                'type': 'missing_metadata',
                'requirement': req.qualified_name
            })

    return issues

model = sg.read("requirements.sysml")
issues = check_model_consistency(model)

if issues:
    logger.error(f"Found {len(issues)} consistency issues:")
    for issue in issues:
        logger.error(f"  {issue['type']}: {issue}")
    sys.exit(1)
else:
    logger.info("Model consistency checks passed")

Advanced Patterns#

Multi-File Processing#

Process multiple SysML files in a repository:

import sysgit as sg
import glob
import os

def process_all_sysml_files(directory, operation):
    """Apply operation to all .sysml files in directory."""
    sysml_files = glob.glob(os.path.join(directory, "**/*.sysml"), recursive=True)

    logger.info(f"Found {len(sysml_files)} SysML files")

    results = []
    for filepath in sysml_files:
        try:
            logger.info(f"Processing: {filepath}")
            model = sg.read(filepath)
            result = operation(model, filepath)
            model.write(filepath, overwrite=True)
            results.append({'file': filepath, 'result': result, 'status': 'success'})
        except Exception as e:
            logger.error(f"Error processing {filepath}: {e}")
            results.append({'file': filepath, 'error': str(e), 'status': 'failed'})

    return results

def update_all_verifications(model, filepath):
    """Example operation: update verifications."""
    verifications = model.select(sg.VerificationCaseUsage)
    # Apply updates...
    return len(verifications)

# Process all files
results = process_all_sysml_files("sysgit/", update_all_verifications)

# Summary
success = sum(1 for r in results if r['status'] == 'success')
failed = sum(1 for r in results if r['status'] == 'failed')
logger.info(f"Processed {success} files successfully, {failed} failed")

Conditional Updates#

Only update model if specific conditions are met:

import sysgit as sg
from datetime import datetime, timedelta

def conditional_update(model, conditions):
    """Only update if all conditions are satisfied."""

    # Check conditions
    if not all_conditions_met(model, conditions):
        logger.info("Conditions not met, skipping update")
        return False

    # Proceed with update
    logger.info("Conditions satisfied, proceeding with update")
    # ... perform updates ...

    return True

def all_conditions_met(model, conditions):
    """Check if all conditions are satisfied."""

    if 'min_coverage' in conditions:
        coverage = calculate_coverage(model)
        if coverage < conditions['min_coverage']:
            logger.warning(f"Coverage {coverage}% below minimum {conditions['min_coverage']}%")
            return False

    if 'max_age_days' in conditions:
        # Check model freshness based on metadata
        # (implementation depends on your metadata structure)
        pass

    return True

# Usage
model = sg.read("requirements.sysml")
conditions = {
    'min_coverage': 90.0,
    'max_age_days': 30
}
if conditional_update(model, conditions):
    model.write("requirements.sysml", overwrite=True)

Incremental Processing#

Track what's been processed to avoid redundant work:

import sysgit as sg
import json
import hashlib

def get_model_hash(model):
    """Calculate hash of model content."""
    return hashlib.sha256(model.code.encode()).hexdigest()

def load_processing_state(state_file):
    """Load previous processing state."""
    try:
        with open(state_file, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        return {}

def save_processing_state(state_file, state):
    """Save processing state."""
    with open(state_file, 'w') as f:
        json.dump(state, f, indent=2)

def incremental_process(sysml_file, state_file, operation):
    """Only process if model has changed."""
    model = sg.read(sysml_file)
    current_hash = get_model_hash(model)

    state = load_processing_state(state_file)
    last_hash = state.get('last_hash')

    if current_hash == last_hash:
        logger.info("Model unchanged since last processing, skipping")
        return False

    logger.info("Model changed, processing")
    result = operation(model)

    # Update state
    state['last_hash'] = current_hash
    state['last_processed'] = datetime.now().isoformat()
    state['result'] = result
    save_processing_state(state_file, state)

    model.write(sysml_file, overwrite=True)
    return True

# Usage
incremental_process(
    "requirements.sysml",
    ".processing_state.json",
    lambda m: update_verifications(m)
)

Integration with External Tools#

DOORS Integration#

Export/import between IBM DOORS and SysGit:

import sysgit as sg
import csv

def export_to_doors_csv(model, output_file):
    """Export requirements to DOORS-compatible CSV."""
    requirements = model.select(sg.RequirementUsage, exclude_anon=True)

    with open(output_file, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['Object ID', 'Object Text', 'Priority', 'Status'])

        for req in requirements:
            writer.writerow([
                req.get('short_name', ''),
                req.text if hasattr(req, 'text') else '',
                req.metadata.get('priority', ''),
                req.metadata.get('status', '')
            ])

    logger.info(f"Exported {len(requirements)} requirements to {output_file}")

def import_from_doors_csv(model, input_file):
    """Import requirement updates from DOORS CSV."""
    updates = {}
    with open(input_file, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            updates[row['Object ID']] = {
                'text': row['Object Text'],
                'priority': row['Priority'],
                'status': row['Status']
            }

    requirements = model.select(sg.RequirementUsage, exclude_anon=True)
    updated_count = 0

    for req in requirements:
        req_id = req.get('short_name')
        if req_id in updates:
            req.text = updates[req_id]['text']
            req.metadata['priority'] = updates[req_id]['priority']
            req.metadata['status'] = updates[req_id]['status']
            updated_count += 1

    logger.info(f"Updated {updated_count} requirements from DOORS")
    return updated_count

# Usage
model = sg.read("requirements.sysml")
export_to_doors_csv(model, "requirements_export.csv")
# ... edit in DOORS ...
import_from_doors_csv(model, "requirements_updated.csv")
model.write("requirements.sysml", overwrite=True)

Jira Integration#

Sync requirements with Jira issues:

import sysgit as sg
import requests

def sync_with_jira(model, jira_config):
    """Synchronize requirements with Jira issues."""

    # Fetch issues from Jira
    response = requests.get(
        f"{jira_config['url']}/rest/api/2/search",
        params={'jql': jira_config['jql']},
        auth=(jira_config['username'], jira_config['api_token'])
    )
    jira_issues = response.json()['issues']

    requirements = model.select(sg.RequirementUsage, exclude_anon=True)

    for req in requirements:
        # Find matching Jira issue
        jira_key = req.metadata.get('jira_key')
        if jira_key:
            matching_issues = [i for i in jira_issues if i['key'] == jira_key]
            if matching_issues:
                issue = matching_issues[0]
                # Update requirement from Jira
                req.metadata['status'] = issue['fields']['status']['name']
                req.metadata['assignee'] = issue['fields']['assignee']['displayName']
                logger.debug(f"Updated {req.qualified_name} from Jira {jira_key}")

    return len([r for r in requirements if r.metadata.get('jira_key')])

# Usage
model = sg.read("requirements.sysml")
jira_config = {
    'url': 'https://your-domain.atlassian.net',
    'username': 'user@example.com',
    'api_token': 'your_api_token',
    'jql': 'project = SYSENG AND type = Requirement'
}
updated = sync_with_jira(model, jira_config)
logger.info(f"Synchronized {updated} requirements with Jira")
model.write("requirements.sysml", overwrite=True)

See Also#