from ziet import Action, Agent, memory
from ziet.integrations import google, apify, openai, stripe, sendgrid
from ziet import PickActionsStrategy, HandOffStrategy, ManualApprovalStrategy, input
@Action(
id="search_flights",
name="Search Flights",
description="Search Google for flight prices",
timeout=30,
retries=2
)
def search_flights(origin: str, dest: str, date: str) -> list:
"""Search Google for flight information."""
query = f"flights from {origin} to {dest} on {date}"
# Use built-in Google integration
results = google.search(query, num_results=10)
# Store results in memory
memory.add(key="search_results", value=results)
return results
@Action(
id="scrape_airline",
name="Scrape Airline Website",
description="Scrape airline website for detailed pricing",
timeout=60
)
def scrape_airline(url: str) -> None:
"""Extract pricing details from airline website."""
# Use built-in Apify integration
data = apify.scrape(
url=url,
extract=[".price", ".flight-details", ".airline-name"]
)
# Store HTML body in memory for later analysis
memory.add(key=url, value=data.get("html", ""))
@Action(
id="generate_summary",
name="Generate Summary",
description="Generate summary report using LLM",
timeout=30
)
def generate_summary() -> str:
"""Create a natural language summary of findings."""
# Retrieve search results from memory
search_results = memory.get("search_results")
# Retrieve scraped data from memory
scraped_data = []
for result in search_results:
url = result.get("url")
if url:
body = memory.get(url)
if body:
scraped_data.append({"url": url, "content": body[:500]})
prompt = f"""
Analyze these flight search results and provide a summary:
Search Results:
{search_results}
Detailed Pricing:
{scraped_data}
Provide:
1. Cheapest option
2. Best value option
3. Recommended choice with reasoning
"""
response = openai.chat(
messages=[{"role": "user", "content": prompt}],
model="gpt-4o-mini"
)
# Store summary in memory
memory.add(key="final_summary", value=response)
return response
@Action(
id="book_flight",
name="Book Flight",
description="Book the selected flight",
timeout=60
)
def book_flight(flight_details: dict) -> dict:
"""Book the flight."""
# Booking logic here
booking = {
"booking_id": f"BK{flight_details.get('id', '12345')}",
"status": "confirmed"
}
memory.add(key="booking", value=booking)
return booking
@Action(
id="charge_payment",
name="Charge Payment",
description="Charge customer payment",
timeout=30
)
def charge_payment(amount: int, email: str) -> dict:
"""Charge payment via Stripe."""
payment = stripe.create_payment_intent(
amount=amount * 100,
currency="usd"
)
memory.add(key="payment", value=payment)
return payment
@Action(
id="send_confirmation",
name="Send Confirmation",
description="Send booking confirmation email",
timeout=20
)
def send_confirmation(email: str) -> bool:
"""Send confirmation email."""
# Retrieve booking from memory
booking = memory.get("final_booking")
sendgrid.send(
to=email,
subject=f"Flight Booked - {booking['booking_id']}",
body=f"<h1>Booking Confirmed</h1><p>{booking['booking_id']}</p>",
html=True
)
return True