-
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathch2rss.py
More file actions
136 lines (104 loc) · 3.81 KB
/
ch2rss.py
File metadata and controls
136 lines (104 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import asyncio
import os
import re
import sys
from datetime import datetime
from textwrap import shorten
import httpx
from asgiref.wsgi import WsgiToAsgi
from bs4 import BeautifulSoup
from cssutils import parseStyle
from flask import Flask, make_response, request
from flask_caching import Cache
from rfeed import Feed, Item
app = Flask(__name__)
app.config["CACHE_TYPE"] = os.environ.get("CACHE_TYPE", "SimpleCache")
asgi_app = WsgiToAsgi(app)
cache = Cache(app)
cache_seconds = 3600
if app.config["CACHE_TYPE"] == "MemcachedCache":
import pylibmc
pylibmc.Client(["127.0.0.1"]).flush_all()
class ChannelNotFound(Exception):
pass
def make_key(*args, **kwargs):
return f"{request.path}?{request.query_string}"
@app.route("/<channel>")
@cache.cached(timeout=cache_seconds, make_cache_key=make_key)
async def rss(channel):
if not re.match(r"^\w{5,32}$", channel):
return "Invalid channel name", 400
try:
resp = make_response(
await channel_to_rss(
channel,
include=request.args.get("include"),
exclude=request.args.get("exclude"),
)
)
resp.headers["Content-type"] = "text/xml;charset=UTF-8"
resp.headers["Cache-Control"] = f"max-age={cache_seconds}"
return resp
except ChannelNotFound:
return f"Channel not found or it cannot be previewed at https://t.me/s/{channel}", 404
def get_message_divs(doc):
return doc.select("div[class~='tgme_widget_message_bubble']")
def get_link_from_div(div):
return div.select("a[href][class='tgme_widget_message_date']")[0].attrs["href"]
def get_text_from_div(div):
elems = div.select("div[class~='tgme_widget_message_text']")
if elems:
return elems[0].get_text("\n", strip=True)
else:
return get_link_from_div(div)
def get_images_from_div(div):
ret = []
for elem in div.select("a[class~='tgme_widget_message_photo_wrap']"):
style = parseStyle(elem["style"])
ret.append(re.sub(r"^url\((.+)\)$", r"\1", style.backgroundImage))
return ret
def get_item_from_div(div):
return {
"link": get_link_from_div(div),
"title": shorten(get_text_from_div(div), width=250, placeholder="..."),
"description": get_text_from_div(div),
"pubDate": datetime.fromisoformat(div.select("time[class='time']")[0].attrs["datetime"]),
}
async def get_doc_from_url(url):
try:
async with httpx.AsyncClient() as client:
res = await client.get(url)
res.raise_for_status()
return BeautifulSoup(res.content, "lxml")
except httpx.HTTPStatusError as e:
if "Redirect response" in str(e):
raise ChannelNotFound()
def channel_not_found(doc):
elems = doc.select("div[class='tgme_page_description']")
if elems and elems[0].text.strip().startswith("If you have Telegram, you can contact @"):
return True
async def channel_to_rss(channel, include=None, exclude=None):
url = f"https://t.me/s/{channel}"
doc = await get_doc_from_url(url)
if channel_not_found(doc):
raise ChannelNotFound()
items = [Item(**get_item_from_div(d)) for d in get_message_divs(doc)]
if exclude:
items = [i for i in items if exclude.lower() not in i.description.lower()]
if include:
items = [i for i in items if include.lower() in i.description.lower()]
feed = Feed(
title=doc.title.text,
link=url,
description=doc.select("meta[content][property='og:description']")[0].attrs["content"],
lastBuildDate=datetime.now(),
items=items,
)
return feed.rss()
async def cli_main():
include = None
if len(sys.argv) > 2:
include = sys.argv[2]
print(await channel_to_rss(sys.argv[1], include=include))
if __name__ == "__main__":
asyncio.run(cli_main())