-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgemini.py
258 lines (221 loc) · 9.6 KB
/
gemini.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
"""
title: Gemini Protocol Tool
author: projectmoon
author_url: https://git.agnos.is/projectmoon/open-webui-filters
version: 0.2.2
license: AGPL-3.0+
required_open_webui_version: 0.4.3
requirements: ignition-gemini
"""
import re
import ignition
from ignition import RedirectResponse, SuccessResponse
from pydantic import BaseModel, Field
from typing import Optional
# From https://stackoverflow.com/questions/33404752/removing-emojis-from-a-string-in-python
def remove_emojis(text):
pattern = re.compile(pattern = "["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
"]+", flags = re.UNICODE)
return pattern.sub(r'',text)
def extract_title(content: str, fallback: str="") -> Optional[str]:
lines = [line.strip() for line in content.splitlines()]
title = next(filter(lambda line: line.startswith("#"), lines), None)
if title:
return remove_emojis(title.lstrip('#'))
else:
return fallback
def result_instructions(url: str, redirect: bool=False) -> str:
content_instructions = (
"Report the content to the user and answer their question."
"Do not make assumptions. This is the content you need. "
"Even if you think it's not the right content, report the "
"content anyway. Do not say you think it's the wrong content."
"You MUST use this content."
)
return ("# Gemini Content Fetch Result\n"
f"Content was successfully fetched for the URL: {url}\n"
) + content_instructions + "\n\n"
def instructions(url: str, redirect: bool=False) -> str:
return result_instructions(url, redirect) + (
"Here are more instructions you must follow. "
"Render all Gemini links as Markdown links. Examples:\n\n"
" - [gemini://example.com](gemini://example.com)\n"
" - [a gemini capsule](gemini://example.com)\n"
" - [my personal capsule](gemini://example.com/personal)\n\n"
"A Gemini link starts with => on the line, and then the URL follows:\n\n"
" - `=> /page A Gemini Page`: A relative link to `/page` titled `A Gemini Page`\n"
" - `=> gemini://example.com/place` An absolute link with no URL title \n\n"
"When rendering relative links, always make them absolute links.\n"
"If the link has a title, render the link title verbatim:\n"
" - `=> gemini://example.com My Page` becomes `[My Page](gemini://example.com)`\n"
" - `=> gemini://example.com` becomes `[gemini://example.com](gemini://example.com)`.\n"
" - `=> gemini://example.com 🐂 My Page` becomes `[🐂 My Page](gemini://example.com)`\n\n"
"The Gemtext content is below in the code block."
)
class GeminiFetcher:
def __init__(self, gemini_url: str, correct_urls: bool=False, web_proxy: str="", event_emitter=None):
self.original_url = gemini_url
self.current_url = None
self.correct_urls = correct_urls
self.web_proxy = web_proxy
self.event_emitter = event_emitter
async def fetch_event(self, done: bool):
if not self.event_emitter:
return
description = (f"Fetched: {self.original_url}" if done
else f"Fetching: {self.original_url}")
await self.event_emitter({
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"description": description,
"done": done,
},
})
async def error_event(self, message: str="There was an error"):
if not self.event_emitter:
return
await self.event_emitter({
"type": "status",
"data": {
"status": "error",
"description": message,
"done": True,
},
})
async def create_citation(self, title: str, content: str):
if not self.event_emitter:
return
document = content
link = (self.web_proxy.replace("<URL>", self.original_url)
if self.web_proxy != "" else self.original_url)
await self.event_emitter({
"type": "source",
"data": {
"document": [document],
"metadata": [{"source": title, "html": False }],
"source": {"name": title, "url": link},
}
})
async def complete_event(self, content: str):
title = extract_title(content, self.original_url)
await self.fetch_event(done=True)
await self.create_citation(title, content)
def correct_url(self, url: str) -> str:
if url.startswith("gemini://http://"):
match = re.match(r'gemini://http://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
return url
if url.startswith("gemini://https://"):
match = re.match(r'gemini://https://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
return url
if url.startswith("https://"):
match = re.match(r'https://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
return url
if url.startswith("http://"):
match = re.match(r'http://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
return url
if not url.startswith("gemini://"):
return f"gemini://{url}"
return url
async def fetch(self, prev_url: Optional[str]=None, redirects: int=0) -> dict:
await self.fetch_event(done=False)
gemini_url = (self.current_url
if self.current_url is not None
else self.original_url)
if redirects > 5:
return {
"success": False,
"content": f"Too many redirects (ended at {gemini_url})",
"redirected": prev_url is not None
}
if self.correct_urls and not prev_url:
corrected_url = self.correct_url(gemini_url)
if corrected_url != gemini_url:
print(f"[Gemini] URL '{gemini_url}' corrected to '{corrected_url}'")
gemini_url = corrected_url
if not prev_url:
print(f"[Gemini] Fetching: {gemini_url}")
else:
print(f"[Gemini] Fetching: {gemini_url} (redirected from {prev_url})")
try:
response = ignition.request(gemini_url, raise_errors=True, referer=prev_url)
if isinstance(response, SuccessResponse):
content = self.handle_content(response)
await self.complete_event(content)
return {
"success": True,
"content": content,
"redirected": prev_url is not None
}
elif isinstance(response, RedirectResponse):
prev_url = self.current_url
self.current_url = response.data()
return await self.fetch(prev_url, redirects + 1)
else:
await self.error_event(f"Unsupported status code {response.status}")
print(f"[Gemini] Unsupported {response.status} code for '{gemini_url}'")
message = (f"Tell the user there was a {response.status} status code. "
f"Support for handling {response.status} is not implemented yet.")
return { "success": False, "content": message, "redirected": prev_url is not None }
except Exception as e:
print(f"[Gemini] error: {e}")
message = f"Tell the user there was an error fetching the page: {e}"
await self.error_event(str(e))
return {
"success": False,
"content": message,
"redirected": prev_url is not None
}
def handle_content(self, resp: SuccessResponse) -> str:
try:
mime_type, encoding = resp.meta.split(";")
except ValueError:
mime_type = resp.meta
mime_type = mime_type.strip()
if mime_type == "text/gemini":
return resp.data().strip()
else:
raise ValueError(f"Not yet able to handle MIME type {mime_type}")
class Tools:
class Valves(BaseModel):
attempt_url_correction: bool = Field(
default=True, description="Attempt to correct malformed URLs (default enabled)."
)
gemini_web_proxy: str = Field(
default="", description="Web proxy to use for clickable citation links."
)
pass
def __init__(self):
self.valves = self.Valves()
self.user_valves = None
async def get_gemini_page(self, gemini_url: str, __event_emitter__) -> str:
"""
Fetches Gemini capsules, content, and web pages over Gemini Protocol.
Use this if the user requests a gemini:// URL.
:param gemini_url: The URL to fetch. The URL MUST begin with gemini://.
:return: The fetched data as Markdown.
"""
fetcher = GeminiFetcher(
gemini_url=gemini_url,
correct_urls=self.valves.attempt_url_correction,
web_proxy=self.valves.gemini_web_proxy,
event_emitter=__event_emitter__
)
resp = await fetcher.fetch()
if resp["success"]:
result_instructions = instructions(gemini_url, redirect=resp["redirected"])
return f"{result_instructions}\n\n```\n{resp['content']}\n```"
else:
return resp["content"]