Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tasks/requests/core/database/hellper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ def get_namespace(namespace):
3: ":",
4: ":",
5: ":",
6: ":",
6: "ملف:",
7: ":",
8: ":",
9: ":",
Expand Down
Empty file.
59 changes: 59 additions & 0 deletions tasks/requests/replace_image/load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging

import pywikibot
from sqlalchemy import select
from sqlalchemy.orm import Session

from core.utils.wikidb import Database
from tasks.requests.core.database.engine import engine
from tasks.requests.core.database.models import Request, Status, Page

# Create an instance of the RequestsPage class
site = pywikibot.Site()

type_of_request = 8

sql_query = """select page.page_title as "prt_title",page.page_namespace as "prt_namespace"
from imagelinks
inner join page on page.page_id = imagelinks.il_from
where il_to in (
select page_title from page where page_title like "PAGE_TITLE_TEMP" and page_namespace in (6)
)
and il_from_namespace in (0,14,10)
"""

try:
session = Session(engine)

stmt = select(Request).filter(Request.status == Status.PENDING, Request.request_type == type_of_request).limit(20)

for request in session.scalars(stmt):
try:
gen = []
database = Database()
try:
database.query = sql_query.replace("PAGE_TITLE_TEMP", str(request.from_title).replace(" ", "_"))
database.get_content_from_database()
gen = database.result
except Exception as e:
print("an error occurred while getting the content from the database")
print(e)
logging.error("an error occurred while getting the content from the database")
logging.error(e)

pages = []
for row in gen:
page_title = str(row['prt_title'], 'utf-8')
pages.append(Page(
title=page_title,
namespace=row['prt_namespace']
))
request.status = Status.RECEIVED
request.pages = pages
session.commit()

except Exception as e:
session.rollback()
print("An error occurred while committing the changes:", e)
except Exception as e:
print(f"An error occurred: {e}")
87 changes: 87 additions & 0 deletions tasks/requests/replace_image/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import copy

import pywikibot
import wikitextparser as wtp

from core.utils.helpers import prepare_str


class ImageReplacer:
def __init__(self, site, page_name):
self.site = site
self.page_name = page_name
self.page = pywikibot.Page(site, page_name)
self.parsed = wtp.parse(self.page.text)
self.temp_text = copy.deepcopy(self.page.text)
self.template_list = [
prepare_str("صورة صفحة رئيسية")
]

def set_old_file_name(self, old_file_name):
self.old_file_name = prepare_str(old_file_name)
self.pure_old_file_name = old_file_name

def set_new_file_name(self, new_file_name):
self.new_file_name = prepare_str(new_file_name)
self.pure_new_file_name = new_file_name

def replace_image(self):
for wikilink in self.parsed.wikilinks:
# for ملف: namespace
if wikilink.title.startswith("ملف:"):
if prepare_str(wikilink.title) == prepare_str("ملف:" + self.old_file_name):
temp_wikilink = copy.deepcopy(wikilink)
temp_wikilink.title = "ملف:" + self.new_file_name
self.temp_text = self.temp_text.replace(str(wikilink), str(temp_wikilink))
# for File: namespace
elif wikilink.title.startswith("File:"):
if prepare_str(wikilink.title) == prepare_str("File:" + self.old_file_name):
temp_wikilink = copy.deepcopy(wikilink)
temp_wikilink.title = "File:" + self.new_file_name
self.temp_text = self.temp_text.replace(str(wikilink), str(temp_wikilink))

def replace_image_in_gallery_tag(self):
for tag in self.parsed.get_tags("gallery"):
tag_str = copy.deepcopy(tag.string).lower()
# for ملف: namespace in
if prepare_str("ملف:" + self.old_file_name) in prepare_str(tag_str):
tag_str = tag_str.replace("ملف:" + self.pure_old_file_name.lower(), "ملف:" + self.pure_new_file_name)
tag_str = tag_str.replace("ملف:" + self.old_file_name.lower(), "ملف:" + self.pure_new_file_name)
self.temp_text = self.temp_text.replace(str(tag.string), str(tag_str))
for tag in self.parsed.get_tags("gallery"):
# for File: namespace in
if prepare_str("File:" + self.old_file_name) in prepare_str(tag_str):
tag_str = tag_str.replace("File:".lower() + self.pure_old_file_name.lower(),
"File:" + self.pure_new_file_name)
tag_str = tag_str.replace("File:".lower() + self.old_file_name.lower(),
"File:" + self.pure_new_file_name)
self.temp_text = self.temp_text.replace(str(tag.string), str(tag_str))

def replace_image_in_imagemap_tag(self):
for tag in self.parsed.get_tags("imagemap"):
tag_str = copy.deepcopy(tag.string).lower()
# for ملف: namespace in
if prepare_str("ملف:" + self.old_file_name) in prepare_str(tag_str):
tag_str = tag_str.replace("ملف:" + self.pure_old_file_name.lower(), "ملف:" + self.pure_new_file_name)
tag_str = tag_str.replace("ملف:" + self.old_file_name.lower(), "ملف:" + self.pure_new_file_name)
self.temp_text = self.temp_text.replace(str(tag.string), str(tag_str))
for tag in self.parsed.get_tags("imagemap"):
# for File: namespace in
if prepare_str("File:" + self.old_file_name) in prepare_str(tag_str):
tag_str = tag_str.replace("File:".lower() + self.pure_old_file_name.lower(),
"File:" + self.pure_new_file_name)
tag_str = tag_str.replace("File:".lower() + self.old_file_name.lower(),
"File:" + self.pure_new_file_name)
self.temp_text = self.temp_text.replace(str(tag.string), str(tag_str))

def replace_image_in_custom_template(self):
for template in self.parsed.templates:
temp_template = copy.deepcopy(template)
if prepare_str(temp_template.name) in self.template_list:
for param in temp_template.arguments:
if prepare_str(param.value) == prepare_str(self.old_file_name):
param.value = self.pure_new_file_name
self.temp_text = self.temp_text.replace(str(template), str(temp_template))

def get_new_text(self):
return self.temp_text
50 changes: 50 additions & 0 deletions tasks/requests/replace_image/read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pywikibot
from sqlalchemy.orm import Session

from tasks.requests.core.database.engine import engine
from tasks.requests.core.database.models import Request
from tasks.requests.core.module import RequestsPage, RequestsScanner

# Create an instance of the RequestsPage class
site = pywikibot.Site()

type_of_request = 8

try:

requests_page = RequestsPage(site)
requests_page.title = "ويكيبيديا:طلبات استبدال الصور"
requests_page.header_text = "{{/ترويسة}}"

requests_page.load_page()

if requests_page.check_user_edits(3000) and requests_page.check_user_groups(group='editor'):
scanner = RequestsScanner()
scanner.pattern = r"\*\s*\[\[:ملف:(?P<source>.*)\]\](?P<extra>.*)\[\[:ملف:(?P<destination>.*)\]\]"
scanner.scan(requests_page.get_page_text())

if scanner.have_requests:
requests_page.start_request()
try:
with Session(engine) as session:
for request in scanner.requests:
request_model = Request(
from_title=request['source'],
to_title=request['destination'],
from_namespace=6,
to_namespace=6,
request_type=type_of_request
)
session.add(request_model)
session.commit()
except Exception as e:
session.rollback()
print("An error occurred while committing the changes:", e)

else:
requests_page.move_to_talk_page()
else:
requests_page.move_to_talk_page()
# Get the page text after removing the header text
except Exception as e:
print(f"An error occurred: {e}")
59 changes: 59 additions & 0 deletions tasks/requests/replace_image/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import traceback

import pywikibot
from sqlalchemy import select, func, distinct
from sqlalchemy.orm import Session

from tasks.requests.core.database.engine import engine
from tasks.requests.core.database.models import Request, Status, Page
from tasks.requests.replace_image.models import ImageReplacer

# Create an instance of the RequestsPage class
site = pywikibot.Site()

type_of_request = 8

try:
session = Session(engine)

stmt = select(Request).join(Page).filter(Request.status == Status.RECEIVED, Page.status == Status.PENDING,
Request.request_type == type_of_request).group_by(Request).having(
func.count(Page.id) == func.count(distinct(Page.id))).limit(100)

for request in session.scalars(stmt):

pages = session.query(Page).filter(Page.request == request, Page.status == Status.PENDING).order_by(
Page.namespace.desc()).limit(10).all()

for page in pages:
try:

page_name = page.page_name
old_file_name = request.from_title
new_file_name = request.to_title

replacer = ImageReplacer(site, page_name)
replacer.set_old_file_name(old_file_name)
replacer.set_new_file_name(new_file_name)
replacer.replace_image()
replacer.replace_image_in_gallery_tag()
replacer.replace_image_in_imagemap_tag()
replacer.replace_image_in_custom_template()
new_text = replacer.get_new_text()

if replacer.page.text != new_text:
replacer.page.text = new_text
summary = "بوت:[[ويكيبيديا:طلبات استبدال الصور]] استبدال [[ملف:{}]] ب [[ملف:{}]] (v0.0.6) (beta)".format(
old_file_name, new_file_name)
replacer.page.save(summary=summary)

page.status = Status.COMPLETED
session.commit()
except Exception as e:
print(f"An error occurred where save : {e}")
just_the_string = traceback.format_exc()
print(just_the_string)
session.rollback()

except Exception as e:
print(f"An error occurred: {e}")