Saturday, April 4, 2026

orchestrator_agent.py

from google.adk.agents import Agent

from .parts_agent import parts_agent


ASSET_DB = {

    "TRL-001": {

        "type": "Reefer Trailer",

        "brake_wear_pct": 78,

        "tyre_pressure_bar": 7.2,

        "tru_fault_code": "TRU-E04",

        "last_service_days_ago": 42,

        "mileage_since_service": 12400,

    },

    "TRL-002": {

        "type": "Curtainsider",

        "brake_wear_pct": 31,

        "tyre_pressure_bar": 8.1,

        "tru_fault_code": None,

        "last_service_days_ago": 12,

        "mileage_since_service": 3200,

    },

    "TNK-001": {

        "type": "Bulk Tanker",

        "brake_wear_pct": 65,

        "tyre_pressure_bar": 6.8,

        "tru_fault_code": None,

        "last_service_days_ago": 30,

        "mileage_since_service": 9800,

    },

}



def get_asset_telemetry(asset_id: str) -> dict:

    """

    Fetches current telemetry data for a given trailer or tanker asset.


    Args:

        asset_id: The unique asset identifier (e.g., TRL-001, TNK-001)


    Returns:

        A dictionary with current sensor readings for the asset.

    """

    asset_id = asset_id.upper().strip()

    if asset_id not in ASSET_DB:

        return {

            "error": f"Asset {asset_id} not found.",

            "available_assets": list(ASSET_DB.keys()),

        }

    return {"asset_id": asset_id, **ASSET_DB[asset_id]}



def predict_failure_risk(asset_id: str) -> dict:

    """

    Predicts component failure risk and estimated days-to-failure for a given asset.


    Args:

        asset_id: The unique asset identifier


    Returns:

        A risk assessment with component-level scores and overall priority.

    """

    asset_id = asset_id.upper().strip()

    if asset_id not in ASSET_DB:

        return {"error": f"Asset {asset_id} not found."}


    data = ASSET_DB[asset_id]

    risks = []


    if data["brake_wear_pct"] > 70:

        risks.append({

            "component": "Brake System",

            "risk_level": "HIGH",

            "estimated_days_to_failure": 3,

            "recommended_action": "Immediate brake adjustment or pad replacement",

            "parts_needed": "Brake pads x4, Brake adjustment kit",

        })

    elif data["brake_wear_pct"] > 50:

        risks.append({

            "component": "Brake System",

            "risk_level": "MEDIUM",

            "estimated_days_to_failure": 10,

            "recommended_action": "Schedule brake inspection within 7 days",

            "parts_needed": "Brake pads x4",

        })


    if data["tyre_pressure_bar"] < 7.0:

        risks.append({

            "component": "Tyres",

            "risk_level": "HIGH",

            "estimated_days_to_failure": 2,

            "recommended_action": "Immediate tyre pressure check and inflation",

            "parts_needed": "Nitrogen inflation kit, Tyre pressure gauge",

        })


    if data["tru_fault_code"]:

        risks.append({

            "component": "Refrigeration Unit (TRU)",

            "risk_level": "HIGH",

            "estimated_days_to_failure": 1,

            "recommended_action": (

                f"TRU fault {data['tru_fault_code']} detected — inspect compressor "

                f"and refrigerant pressure immediately"

            ),

            "parts_needed": "Refrigerant R-404A, Compressor filter kit",

        })


    priority = "LOW"

    if any(r["risk_level"] == "HIGH" for r in risks):

        priority = "CRITICAL"

    elif any(r["risk_level"] == "MEDIUM" for r in risks):

        priority = "MODERATE"


    all_parts = ", ".join(

        r["parts_needed"] for r in risks if r.get("parts_needed")

    )


    return {

        "asset_id": asset_id,

        "asset_type": data["type"],

        "overall_priority": priority,

        "component_risks": risks if risks else [{

            "component": "All Systems",

            "risk_level": "LOW",

            "recommended_action": "No immediate action required",

            "parts_needed": "",

        }],

        "all_parts_needed": all_parts,

    }



def generate_job_card(asset_id: str) -> dict:

    """

    Auto-generates a structured repair job card for a given asset.


    Args:

        asset_id: The unique asset identifier


    Returns:

        A structured job card. Parts availability will be checked by the parts sub-agent.

    """

    risk = predict_failure_risk(asset_id)

    if "error" in risk:

        return risk


    jobs = []

    for comp in risk["component_risks"]:

        if comp["risk_level"] in ("HIGH", "MEDIUM"):

            jobs.append(

                f"[{comp['risk_level']}] {comp['component']}: "

                f"{comp['recommended_action']}"

            )


    return {

        "job_card_id": f"JC-{asset_id}-AUTO",

        "asset_id": asset_id,

        "asset_type": risk["asset_type"],

        "priority": risk["overall_priority"],

        "jobs": jobs if jobs else ["No urgent jobs — routine inspection only"],

        "parts_required": risk["all_parts_needed"] or "None",

        "depot_routing": "Depot A — Manchester (nearest depot)",

        "technician_skill": "Heavy Vehicle Technician Level 2",

        "status": "PENDING PARTS CONFIRMATION",

        "next_step": (

            f"Parts agent must now confirm availability of: "

            f"{risk['all_parts_needed']}"

        ),

    }



def get_fleet_risk_summary() -> dict:

    """

    Returns a ranked risk summary across all assets in the fleet.

    Use this when the technician asks for an overview of the whole fleet.


    Returns:

        All assets ranked by overall priority with top risks highlighted.

    """

    summary = []

    for asset_id in ASSET_DB:

        risk = predict_failure_risk(asset_id)

        summary.append({

            "asset_id": asset_id,

            "asset_type": risk["asset_type"],

            "priority": risk["overall_priority"],

            "top_risk": (

                risk["component_risks"][0]["component"]

                if risk["component_risks"] else "None"

            ),

            "parts_needed": risk.get("all_parts_needed", "None"),

        })


    priority_order = {"CRITICAL": 0, "MODERATE": 1, "LOW": 2}

    summary.sort(key=lambda x: priority_order.get(x["priority"], 3))


    return {

        "total_assets": len(summary),

        "critical_count": sum(1 for a in summary if a["priority"] == "CRITICAL"),

        "moderate_count": sum(1 for a in summary if a["priority"] == "MODERATE"),

        "fleet_ranked": summary,

    }



root_agent = Agent(

    name="fleetsense_orchestrator",

    model="gemini-2.0-flash",

    description=(

        "TCS FleetSense Orchestrator: coordinates predictive asset health assessment "

        "and delegates parts/inventory queries to the Parts Inventory sub-agent."

    ),

    instruction="""

You are FleetSense, the AI maintenance orchestrator for a trailer and tanker leasing company.


You coordinate with a specialist Parts Inventory sub-agent to deliver complete

actionable maintenance decisions to technicians.


Your workflow for ANY asset query:

1. Fetch telemetry with get_asset_telemetry

2. Predict failure risk with predict_failure_risk

3. If risk is MEDIUM or CRITICAL generate a job card with generate_job_card

4. Always delegate parts availability to the parts_inventory_agent sub-agent

   passing it the parts_required field from the job card

5. Combine both results into a final recommendation for the technician


For fleet overview queries use get_fleet_risk_summary then highlight

CRITICAL assets and hand off parts checking for those assets to the sub-agent.


Available assets: TRL-001 (Reefer Trailer), TRL-002 (Curtainsider), TNK-001 (Bulk Tanker)


Always be concise and direct. Technicians are on the workshop floor not at a desk.

Lead with priority and action not with explanations.

""",

    tools=[

        get_asset_telemetry,

        predict_failure_risk,

        generate_job_card,

        get_fleet_risk_summary,

    ],

    sub_agents=[parts_agent],

)

No comments:

Powered By Blogger