|
| 1 | +"""Email validation APIs""" |
| 2 | + |
| 3 | +from contextlib import contextmanager |
| 4 | +from csv import DictWriter |
| 5 | +from dataclasses import dataclass, field |
| 6 | +import json |
| 7 | +import logging |
| 8 | +import os |
| 9 | +import tempfile |
| 10 | +import typing |
| 11 | +from urllib.parse import urljoin |
| 12 | +from zipfile import ZipFile |
| 13 | + |
| 14 | +from anymail.utils import get_anymail_setting |
| 15 | +from django.db import transaction |
| 16 | +import requests |
| 17 | + |
| 18 | +from authentication.models import EmailValidation |
| 19 | +from open_discussions.utils import chunks |
| 20 | + |
| 21 | + |
| 22 | +log = logging.getLogger() |
| 23 | + |
| 24 | +# the mailgun limit is 25MB, but we set a lower limit |
| 25 | +# so that we don't have to count bytes before writing |
| 26 | +# MB => kB => B |
| 27 | +MAX_CSV_FILE_SIZE = 24 * 1000 * 1000 |
| 28 | +VALIDATION_API_URL = "https://api.mailgun.net/v4/address/validate/bulk/" |
| 29 | + |
| 30 | + |
| 31 | +@dataclass |
| 32 | +class CsvBatchFile: |
| 33 | + """Dataclass for a CSV batch""" |
| 34 | + |
| 35 | + name: str |
| 36 | + file: typing.TextIO |
| 37 | + writer: DictWriter |
| 38 | + start_position: int = field(init=False) |
| 39 | + |
| 40 | + def __post_init__(self): |
| 41 | + """Set the start position after init""" |
| 42 | + self.start_position = self.file.tell() |
| 43 | + |
| 44 | + def has_data(self) -> bool: |
| 45 | + """Return true if data has been written""" |
| 46 | + return self.file.tell() > self.start_position |
| 47 | + |
| 48 | + def is_full(self) -> bool: |
| 49 | + """Return true if the size of the file is at or past the limit""" |
| 50 | + return self.file.tell() > MAX_CSV_FILE_SIZE |
| 51 | + |
| 52 | + |
| 53 | +@contextmanager |
| 54 | +def csv_batch_file(name: str): |
| 55 | + """ |
| 56 | + Get a temporary file and CSV writer |
| 57 | +
|
| 58 | + Args: |
| 59 | + name(str): the name of the operation/upload |
| 60 | +
|
| 61 | + Returns: |
| 62 | + CsvBatchFile: the batch and associated state and files |
| 63 | + """ |
| 64 | + with tempfile.NamedTemporaryFile( |
| 65 | + prefix=name, suffix=".csv", mode="w", delete=False |
| 66 | + ) as file: |
| 67 | + writer = DictWriter(file, fieldnames=["email"]) |
| 68 | + writer.writeheader() |
| 69 | + |
| 70 | + yield CsvBatchFile(name, file, writer) |
| 71 | + |
| 72 | + |
| 73 | +def get_api_key(): |
| 74 | + """Return the Mailgun API key from the Anymail settings""" |
| 75 | + return get_anymail_setting("API_KEY", esp_name="MAILGUN") |
| 76 | + |
| 77 | + |
| 78 | +def send_to_mailgun(csv_batch): |
| 79 | + """ |
| 80 | + Send the CSV file to mailgun |
| 81 | +
|
| 82 | + """ |
| 83 | + with open(csv_batch.file.name, "r") as csv_file: |
| 84 | + # send the temporary file up to mailgun |
| 85 | + resp = requests.post( |
| 86 | + urljoin(VALIDATION_API_URL, csv_batch.name), |
| 87 | + auth=("api", get_api_key()), |
| 88 | + files={"file": ("file", csv_file.read())}, |
| 89 | + timeout=60 * 5, |
| 90 | + ) |
| 91 | + resp.raise_for_status() |
| 92 | + |
| 93 | + |
| 94 | +def batch_validation_csv_files( |
| 95 | + users, |
| 96 | + list_name_base: str, |
| 97 | + *, |
| 98 | + upload_batch_size: int, |
| 99 | +): |
| 100 | + """ |
| 101 | + Yields CSV files to upload to mailgun |
| 102 | + """ |
| 103 | + |
| 104 | + # create the iterator here so it can be reused across while loop steps |
| 105 | + user_chunks_iter = chunks(users.only("email"), chunk_size=1000) |
| 106 | + |
| 107 | + batch_idx = 1 |
| 108 | + while True: |
| 109 | + with csv_batch_file(f"{list_name_base}-{batch_idx}") as csv_batch: |
| 110 | + # this transaction will mainly get rolled back if there's an error within the yield |
| 111 | + with transaction.atomic(): |
| 112 | + batch_size = 0 |
| 113 | + for chunk in user_chunks_iter: |
| 114 | + csv_batch.writer.writerows( |
| 115 | + [{"email": user.email} for user in chunk] |
| 116 | + ) |
| 117 | + |
| 118 | + EmailValidation.objects.bulk_create( |
| 119 | + [ |
| 120 | + EmailValidation( |
| 121 | + user=user, email=user.email, list_name=csv_batch.name |
| 122 | + ) |
| 123 | + for user in chunk |
| 124 | + ] |
| 125 | + ) |
| 126 | + |
| 127 | + batch_size += len(chunk) |
| 128 | + |
| 129 | + if csv_batch.is_full() or batch_size >= upload_batch_size: |
| 130 | + break |
| 131 | + |
| 132 | + if not csv_batch.has_data(): |
| 133 | + return |
| 134 | + |
| 135 | + # ensure everything is flushed to disk before we yield |
| 136 | + csv_batch.file.flush() |
| 137 | + |
| 138 | + yield csv_batch |
| 139 | + |
| 140 | + batch_idx += 1 |
| 141 | + |
| 142 | + |
| 143 | +def start_user_email_validation( |
| 144 | + users, |
| 145 | + list_name_base: str, |
| 146 | + *, |
| 147 | + upload_batch_size: int, |
| 148 | +): |
| 149 | + """ |
| 150 | + Start the bulk verification of users' email addresses. |
| 151 | + """ |
| 152 | + for csv_batch in batch_validation_csv_files( |
| 153 | + users, |
| 154 | + list_name_base, |
| 155 | + upload_batch_size=upload_batch_size, |
| 156 | + ): |
| 157 | + send_to_mailgun(csv_batch) |
| 158 | + |
| 159 | + |
| 160 | +def get_validation_result(list_name: str) -> dict: |
| 161 | + """Get the results for a list validation""" |
| 162 | + resp = requests.get( |
| 163 | + urljoin(VALIDATION_API_URL, list_name), |
| 164 | + auth=("api", get_api_key()), |
| 165 | + timeout=60 * 5, |
| 166 | + ) |
| 167 | + resp.raise_for_status() |
| 168 | + return resp.json() |
| 169 | + |
| 170 | + |
| 171 | +def fetch_result_zip_file(url: str, temp_dir: str) -> str: |
| 172 | + """Fetch the result file and write it out locally""" |
| 173 | + filename = os.path.join(temp_dir, "results.json.zip") |
| 174 | + |
| 175 | + with requests.get(url, stream=True, timeout=60 * 5) as resp: |
| 176 | + resp.raise_for_status() |
| 177 | + with open(filename, mode="wb") as zip_file: |
| 178 | + for chunk in resp.iter_content(chunk_size=8192): |
| 179 | + zip_file.write(chunk) |
| 180 | + return filename |
| 181 | + |
| 182 | + |
| 183 | +def unzip_result_file(zip_filename: str, temp_dir: str): |
| 184 | + """Extract the singular result file(s)""" |
| 185 | + with ZipFile(zip_filename, mode="r") as zip_file: |
| 186 | + # there's only 1 file in the zip |
| 187 | + for result_file in zip_file.namelist(): |
| 188 | + zip_file.extract(result_file, path=temp_dir) |
| 189 | + |
| 190 | + yield os.path.join(temp_dir, result_file) |
| 191 | + |
| 192 | + |
| 193 | +def process_result_file(path: str): |
| 194 | + """Process a results JSON file""" |
| 195 | + with open(path, "r") as f: |
| 196 | + results = json.loads(f.read()) |
| 197 | + |
| 198 | + log.info("Processing %s results from %s", len(results), path) |
| 199 | + |
| 200 | + for result in results: |
| 201 | + email = result["address"] |
| 202 | + |
| 203 | + validation_result = result["result"] |
| 204 | + |
| 205 | + validations = EmailValidation.objects.filter(email=email) |
| 206 | + |
| 207 | + for validation in validations: |
| 208 | + validation.result = validation_result |
| 209 | + validation.save() |
| 210 | + |
| 211 | + if validation_result in ("undeliverable", "do_not_send"): |
| 212 | + for validation in validations: |
| 213 | + user = validation.user |
| 214 | + user.is_active = False |
| 215 | + user.save() |
| 216 | + |
| 217 | + |
| 218 | +def process_validation_results(url: str): |
| 219 | + """Download and process the result file""" |
| 220 | + temp_dir = tempfile.mkdtemp() |
| 221 | + |
| 222 | + zip_filename = fetch_result_zip_file(url, temp_dir) |
| 223 | + |
| 224 | + for result_file in unzip_result_file(zip_filename, temp_dir): |
| 225 | + process_result_file(result_file) |
0 commit comments