-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathweather-bot.py
218 lines (185 loc) · 8.7 KB
/
weather-bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import aiohttp
import asyncio
import os
import sys
from loguru import logger
from dotenv import load_dotenv
from noaa_sdk import NOAA
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
logger.remove(0)
logger.add(sys.stderr, level="DEBUG", colorize=True)
load_dotenv()
new_level_symbol = ". ⛅︎ ."
new_level = logger.level(new_level_symbol, no=38, color="<light-magenta><BLACK>")
# webrtc room to talk to the bot
async def get_daily_room():
room_override = os.getenv("DAILY_ROOM")
if room_override:
return room_override
else:
async with aiohttp.ClientSession() as session:
daily_rest_helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY"),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=session,
)
room_config = await daily_rest_helper.create_room(
DailyRoomParams(
properties={"enable_prejoin_ui":False}
)
)
return room_config.url
# rest API call to NOAA to get current weather
async def get_noaa_simple_weather(latitude: float, longitude: float, **kwargs):
n = NOAA()
description = False
fahrenheit_temp = 0
try:
observations = n.get_observations_by_lat_lon(latitude, longitude, num_of_stations=1)
for observation in observations:
description = observation["textDescription"]
celsius_temp = observation["temperature"]["value"]
if description:
break
fahrenheit_temp = (celsius_temp * 9 / 5) + 32
except Exception as e:
logger.log(new_level_symbol, f"Error getting NOAA weather: {e}")
logger.log(new_level_symbol, f"get_noaa_simple_weather * results: {description}, {fahrenheit_temp}")
return description, fahrenheit_temp
async def main():
bot_name = "⛅︎ current w e a t h e r bot ⛅︎"
room_url = await get_daily_room()
# yes, it was worth the time to do this
logger.opt(colors=True).log(new_level_symbol, f"<black><RED>_____*</RED></black>")
logger.opt(colors=True).log(new_level_symbol, f"<black><LIGHT-RED>_____*</LIGHT-RED></black>")
logger.opt(colors=True).log(new_level_symbol, f"<black><Y>_____*</Y></black>")
logger.opt(colors=True).log(new_level_symbol, f"<black><G>_____*</G></black> Navigate to")
logger.opt(colors=True).log(new_level_symbol, f"<black><C>_____*</C></black> <u><light-cyan>{room_url}</light-cyan></u>")
logger.opt(colors=True).log(new_level_symbol, f"<black><E>_____*</E></black> to talk to")
logger.opt(colors=True).log(new_level_symbol, f"<black><LIGHT-BLUE>_____*</LIGHT-BLUE></black> <light-blue>{bot_name}</light-blue>")
logger.opt(colors=True).log(new_level_symbol, f"<black><MAGENTA>_____*</MAGENTA></black>")
logger.opt(colors=True).log(new_level_symbol, f"<black><R>_____*</R></black>")
transport = DailyTransport(
room_url,
None,
bot_name,
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
)
system_instruction = """
You are a helpful assistant who can answer questions and use tools.
You have a tool called "get_weather" that can be used to get the current weather.
If the user asks for the weather, call this tool and do not ask the user for latitude and longitude.
Infer latitude and longitude from the location and use those in the get_weather tool.
Use ONLY this tool to get weather information. Never use other tools or apis, even if you encounter an error.
Say you are having trouble retrieving the weather if the tool call does not work.
If you are asked about a location outside the United States, politely respond that you are only able to retrieve current weather information for locations in the United States.
If a location is not provided, always ask the user what location for which they would like the weather.
"""
tools = [
{
"function_declarations": [
{
"name": "get_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The location for the weather request.",
},
"latitude": {
"type": "string",
"description": "Provide this by infering the latitude from the location. Supply latitude as a string. For example, '42.3601'.",
},
"longitude": {
"type": "string",
"description": "Provide this by infering the longitude from the location. Supply longitude as a string. For example, '-71.0589'.",
},
},
"required": ["location", "latitude", "longitude"],
},
},
]
}
]
llm = GeminiMultimodalLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,
)
async def fetch_weather_from_api(
function_name, tool_call_id, args, llm, context, result_callback
):
location = args["location"]
latitude = float(args["latitude"])
longitude = float(args["longitude"])
description, fahrenheit_temp = None, None
logger.log(new_level_symbol, f"fetch_weather_from_api * location: {location} - '{latitude}, {longitude}'")
if latitude and longitude:
# actual external rest API call
description, fahrenheit_temp = await get_noaa_simple_weather(latitude, longitude)
else:
return await result_callback("Sorry, I don't recognize that location.")
if not fahrenheit_temp:
return await result_callback(
f"I'm sorry, I can't get the weather for {location} right now. Can you ask again please?"
)
if not description:
return await result_callback(
f"According to noah, the weather in {location} is currently {round(fahrenheit_temp)} degrees."
)
else:
return await result_callback(
f"According to noah, the weather in {location} is currently {round(fahrenheit_temp)} degrees and {description}."
)
llm.register_function("get_weather", fetch_weather_from_api)
# continue to setup pipeline
context = OpenAILLMContext(
[{"role": "user", "content": "Say hello. Make a subtle weather pun."}],
)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
# set event handlers
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
logger.log(new_level_symbol, f"Participant left: {participant}")
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())