diff --git a/fastpath/clickhouse_init.sql b/fastpath/clickhouse_init.sql index da49958c..2cde490b 100644 --- a/fastpath/clickhouse_init.sql +++ b/fastpath/clickhouse_init.sql @@ -38,7 +38,12 @@ CREATE TABLE IF NOT EXISTS default.fastpath `blocking_type` String, `test_helper_address` LowCardinality(String), `test_helper_type` LowCardinality(String), - `ooni_run_link_id` Nullable(UInt64) + `ooni_run_link_id` Nullable(UInt64), + `is_verified` Int8, + `nym` Nullable(String), + `zkp_request` Nullable(String), + `age_range` Nullable(String), + `msm_range` Nullable(String), ) ENGINE = ReplacingMergeTree ORDER BY (measurement_start_time, report_id, input) @@ -202,4 +207,3 @@ CREATE TABLE IF NOT EXISTS default.fingerprints_http ) ENGINE = EmbeddedRocksDB PRIMARY KEY name; - diff --git a/fastpath/fastpath/core.py b/fastpath/fastpath/core.py index a06ca72c..161b030f 100644 --- a/fastpath/fastpath/core.py +++ b/fastpath/fastpath/core.py @@ -1616,7 +1616,7 @@ def flag_measurements_with_wrong_date(msm: dict, msmt_uid: str, scores: dict) -> scores["msg"] = "Measurement start time too old" def write_measurement_to_disk(msm_tup) -> None: - """Write this measurement to disk so that it can be + """Write this measurement to disk so that it can be processed by the measurement uploader Args: @@ -1633,7 +1633,7 @@ def write_measurement_to_disk(msm_tup) -> None: msmtdir = spooldir / "incoming" / dirname msmtdir.mkdir(parents=True, exist_ok=True) - try: + try: msmt_f_tmp = msmtdir / f"{msmt_uid}.post.tmp" msmt_f_tmp.write_bytes(data) msmt_f = msmtdir / f"{msmt_uid}.post" @@ -1654,7 +1654,14 @@ def process_measurement(msm_tup, buffer_writes=False) -> None: assert msmt_uid if measurement is None: measurement = ujson.loads(msm_jstr) - if sorted(measurement.keys()) == ["content", "format"]: + + is_verified = g(measurement, 'is_verified', False) + nym = g(measurement, 'nym') + zkp_request = g(measurement, 'zkp_request') + age_range = g(measurement, 'age_range') + msm_range = g(measurement, 'msm_range') + + if "content" in measurement and "format" in measurement: measurement = unwrap_msmt(measurement) rid = measurement.get("report_id") inp = measurement.get("input") @@ -1742,6 +1749,11 @@ def process_measurement(msm_tup, buffer_writes=False) -> None: test_helper_address, test_helper_type, ooni_run_link_id, + is_verified, + nym, + zkp_request, + age_range, + msm_range, buffer_writes=buffer_writes, ) diff --git a/fastpath/fastpath/db.py b/fastpath/fastpath/db.py index e8decb6c..5ed0d9f6 100644 --- a/fastpath/fastpath/db.py +++ b/fastpath/fastpath/db.py @@ -10,7 +10,7 @@ from datetime import datetime from textwrap import dedent from urllib.parse import urlparse -from typing import List, Tuple, Dict, Optional +from typing import List, Tuple, Dict, Optional, Any import logging try: @@ -209,6 +209,11 @@ def clickhouse_upsert_summary( test_helper_address: str, test_helper_type: str, ooni_run_link_id: Optional[int], + is_verified: bool, + zkp_request: Optional[str], + nym: Optional[str], + age_range: Optional[Tuple[int, int]], + msm_range: Optional[Tuple[int, int]], buffer_writes=False, ) -> None: """Insert a row in the fastpath table. Overwrite an existing one.""" @@ -224,6 +229,10 @@ def nn(features: dict, k: str) -> str: def tf(v: bool) -> str: return "t" if v else "f" + def s_or_n(x : Optional[Any]) -> Optional[str]: + """Serialize to string if not None, return None otherwise""" + return ujson.dumps(x) if x is not None else None + test_name = msm.get("test_name", None) or "" input_, domain = extract_input_domain(msm, test_name) asn = int(msm["probe_asn"][2:]) # AS123 @@ -257,6 +266,11 @@ def tf(v: bool) -> str: test_helper_address=test_helper_address, test_helper_type=test_helper_type, ooni_run_link_id=ooni_run_link_id, + is_verified=tf(is_verified), + nym=nym, + zkp_request=zkp_request, + age_range=s_or_n(age_range), + msm_range=s_or_n(msm_range) ) if buffer_writes: diff --git a/fastpath/fastpath/tests/test_functional_nodb.py b/fastpath/fastpath/tests/test_functional_nodb.py index 80b658c7..5af5b33b 100644 --- a/fastpath/fastpath/tests/test_functional_nodb.py +++ b/fastpath/fastpath/tests/test_functional_nodb.py @@ -154,6 +154,11 @@ def test_score_web_connectivity_bug_610_2(fprints): "test_helper_address": "https://0.th.ooni.org", "test_helper_type": "https", "ooni_run_link_id": None, + "is_verified" : "f", + "nym" : None, + "zkp_request" : None, + "age_range" : None, + "msm_range" : None, } ] @@ -201,6 +206,11 @@ def test_score_browser_web(fprints): "test_runtime": 0.35740000000037253, "test_start_time": datetime.datetime(2023, 3, 20, 18, 26, 35), "test_version": "0.1.0", + "is_verified" : "f", + "nym" : None, + "zkp_request" : None, + "age_range" : None, + "msm_range" : None, }, ] @@ -252,6 +262,11 @@ def test_score_openvpn(): "test_helper_address": "", "test_helper_type": "", "ooni_run_link_id": None, + "is_verified" : "f", + "nym" : None, + "zkp_request" : None, + "age_range" : None, + "msm_range" : None, } ] diff --git a/fastpath/makefile b/fastpath/makefile index 10f18430..b0c87355 100644 --- a/fastpath/makefile +++ b/fastpath/makefile @@ -80,18 +80,18 @@ docker: # Runs docker in foreground, useful for checking errors in the image before it runs docker-fg: - docker compose --profile default up --build + docker compose --profile default up --build -# Runs both fastpath and the testing clickhous. +# Runs both fastpath and the testing clickhous. # Mind the fastpath configuration in fastpath.conf docker-all: docker-clickhouse echo "Waiting for clickhouse..." - sleep 4 - docker compose --profile default up --build -d + sleep 4 + docker compose --profile default up --build -d # Turns off every service docker-down: - docker compose --profile all down + docker compose --profile all down # If you need to test the fastpath locally you can use this rule to spawn the clickhouse database # locally and then use `make docker` or `make docker-fg` to start the fastpath container. Ex: @@ -109,15 +109,15 @@ docker-login: # Get logs from the fastpath docker service docker-logs: - docker compose logs fastpath -f + docker compose logs fastpath -f -# Get logs for a specified service. Example: +# Get logs for a specified service. Example: # `make docker-logs-for args="clickhouse-server"` docker-logs-for: - docker compose logs $(args) -f + docker compose logs $(args) -f # Used for actually building the fastpath docker image -docker-build: +docker-build: # We need to use tar -czh to resolve the common dir symlink tar -czh . | docker build \ --build-arg BUILD_LABEL=${BUILD_LABEL} \ diff --git a/ooniapi/common/src/common/alembic/versions/7e28b5d17a7f_add_server_state_table_for_anonymous_.py b/ooniapi/common/src/common/alembic/versions/7e28b5d17a7f_add_server_state_table_for_anonymous_.py new file mode 100644 index 00000000..99e35c2a --- /dev/null +++ b/ooniapi/common/src/common/alembic/versions/7e28b5d17a7f_add_server_state_table_for_anonymous_.py @@ -0,0 +1,71 @@ +"""Add server state table for anonymous credentials + +Revision ID: 7e28b5d17a7f +Revises: 8e7ecea5c2f5 +Create Date: 2025-10-08 10:36:55.796144 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +import sqlalchemy.schema as sc + + +# revision identifiers, used by Alembic. +revision: str = "7e28b5d17a7f" +down_revision: Union[str, None] = "8e7ecea5c2f5" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +# --- +ooniprobe_server_state_id_seq = sc.Sequence("ooniprobe_server_state_id_seq", start=1) +ooniprobe_manifest_id_seq = sc.Sequence("ooniprobe_manifest_id_seq", start=1) + + +def upgrade() -> None: + op.execute(sc.CreateSequence(ooniprobe_server_state_id_seq)) + + op.create_table( + "ooniprobe_server_state", + sa.Column( + "id", + sa.String(), + nullable=False, + server_default=ooniprobe_server_state_id_seq.next_value(), + primary_key=True, + ), + sa.Column("date_created", sa.DateTime(timezone=True), nullable=False), + sa.Column("secret_key", sa.String(), nullable=False), + sa.Column("public_parameters", sa.String(), nullable=False), + ) + + op.execute(sc.CreateSequence(ooniprobe_manifest_id_seq)) + op.create_table( + "ooniprobe_manifest", + sa.Column( + "version", + sa.String(), + nullable=False, + server_default=ooniprobe_manifest_id_seq.next_value(), + primary_key=True, + ), + sa.Column("date_created", sa.DateTime(timezone=True), nullable=False), + sa.Column("nym_scope", sa.String(), nullable=False), + sa.Column("submission_policy", sa.JSON(), nullable=False), + + sa.Column( + "server_state_id", + sa.String(), + sa.ForeignKey("ooniprobe_server_state.id"), + nullable=False, + ), + ) + + +def downgrade() -> None: + op.drop_table("ooniprobe_manifest") + op.drop_table("ooniprobe_server_state") + op.execute(sc.DropSequence(ooniprobe_server_state_id_seq)) + op.execute(sc.DropSequence(ooniprobe_manifest_id_seq)) diff --git a/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql b/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql index 7245f4ae..2ee79c98 100644 --- a/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql +++ b/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql @@ -36,13 +36,18 @@ CREATE TABLE IF NOT EXISTS default.fastpath `blocking_type` String, `test_helper_address` LowCardinality(String), `test_helper_type` LowCardinality(String), - `ooni_run_link_id` Nullable(UInt64) + `ooni_run_link_id` Nullable(UInt64), + `is_verified` Int8, + `nym` Nullable(String), + `zkp_request` Nullable(String), + `age_range` Nullable(String), + `msm_range` Nullable(String) ) ENGINE = ReplacingMergeTree ORDER BY (measurement_start_time, report_id, input) SETTINGS index_granularity = 8192; -CREATE TABLE IF NOT EXISTS default.citizenlab +CREATE TABLE IF NOT EXISTS default.citizenlab ( `domain` String, `url` String, diff --git a/ooniapi/services/ooniprobe/Dockerfile b/ooniapi/services/ooniprobe/Dockerfile index 42fae6d0..1c230535 100644 --- a/ooniapi/services/ooniprobe/Dockerfile +++ b/ooniapi/services/ooniprobe/Dockerfile @@ -1,5 +1,5 @@ # Python builder -FROM python:3.11-bookworm as builder +FROM python:3.11-bookworm AS builder ARG BUILD_LABEL=docker WORKDIR /build @@ -14,10 +14,10 @@ RUN find /build -type f -name '._*' -delete RUN echo "$BUILD_LABEL" > /build/src/ooniprobe/BUILD_LABEL -RUN hatch build +RUN make build ### Actual image running on the host -FROM python:3.11-bookworm as runner +FROM python:3.11-bookworm AS runner WORKDIR /app diff --git a/ooniapi/services/ooniprobe/pyproject.toml b/ooniapi/services/ooniprobe/pyproject.toml index b8c6fb32..c29f2c2e 100644 --- a/ooniapi/services/ooniprobe/pyproject.toml +++ b/ooniapi/services/ooniprobe/pyproject.toml @@ -28,7 +28,8 @@ dependencies = [ "fastapi-utils[all] ~= 0.8.0", "zstd ~= 1.5.7.2", "boto3 ~= 1.39.3", - "boto3-stubs[s3] ~= 1.39.3" + "boto3-stubs[s3] ~= 1.39.3", + "ooniauth-py @ https://github.com/ooni/userauth/raw/refs/heads/release-wheels/ooniauth-py/wheels/ooniauth_py-0.1.0-cp310-abi3-manylinux_2_34_x86_64.whl" ] readme = "README.md" @@ -82,6 +83,13 @@ test-cov = "pytest -s --full-trace --log-level=INFO --log-cli-level=INFO -v --s cov-report = ["coverage report"] cov = ["test-cov", "cov-report"] +# Allows specifyng dependencies as a direct reference like an URL +# or a file path. +# Required by ooniauth-py +# See: https://hatch.pypa.io/1.13/config/metadata/#allowing-direct-references +[tool.hatch.metadata] +allow-direct-references = true + [tool.pytest.ini_options] addopts = ["--import-mode=importlib"] diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/dependencies.py b/ooniapi/services/ooniprobe/src/ooniprobe/dependencies.py index 9f1e1327..4ab81506 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/dependencies.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/dependencies.py @@ -6,7 +6,7 @@ import geoip2.database from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import sessionmaker, Session from clickhouse_driver import Client as Clickhouse @@ -15,6 +15,7 @@ from .common.config import Settings from .common.dependencies import get_settings +from .models import OONIProbeServerState, OONIProbeManifest SettingsDep: TypeAlias = Annotated[Settings, Depends(get_settings)] @@ -23,13 +24,13 @@ def get_postgresql_session(settings: SettingsDep): engine = create_engine(settings.postgresql_url) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) - db = SessionLocal() try: yield db finally: db.close() +PostgresSessionDep = Annotated[Session, Depends(get_postgresql_session)] def get_cc_reader(settings: SettingsDep): db_path = Path(settings.geoip_db_dir, "cc.mmdb") @@ -64,3 +65,11 @@ def get_s3_client() -> S3Client: S3ClientDep = Annotated[S3Client, Depends(get_s3_client)] + + +def get_latest_manifest(session : PostgresSessionDep) -> OONIProbeManifest: + manifest = OONIProbeManifest.get_latest(session) + assert manifest is not None, "Uninitialized `OONIProbeServerState` table" + return manifest + +LatestManifestDep = Annotated[OONIProbeManifest, Depends(get_latest_manifest)] \ No newline at end of file diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/main.py b/ooniapi/services/ooniprobe/src/ooniprobe/main.py index ca5909be..ecf3285a 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/main.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/main.py @@ -48,6 +48,11 @@ async def lifespan( if repeating_tasks_active: await setup_repeating_tasks(settings) + db = get_postgresql_session(settings) + session = next(db) + models.OONIProbeManifest.init_table(session) + next(db, None) # closes the connection + yield @@ -133,6 +138,19 @@ async def health( if settings.prometheus_metrics_password == "CHANGEME": errors.append("bad_prometheus_password") + # check that we have at least one server state object for credentials validation + try: + state = db.query(models.OONIProbeManifest).limit(1).one_or_none() + if state is None: + errors.append("no_server_state_entry") + manifest = models.OONIProbeManifest.get_latest(db) + if manifest == None: + errors.append("no_manifest_entry") + except Exception as exc: + log.error("Error trying to retrieve server state") + log.error(exc) + pass # Database error already reported above + status = "ok" if len(errors) > 0: status = "fail" diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/metrics.py b/ooniapi/services/ooniprobe/src/ooniprobe/metrics.py new file mode 100644 index 00000000..568e5bf0 --- /dev/null +++ b/ooniapi/services/ooniprobe/src/ooniprobe/metrics.py @@ -0,0 +1,78 @@ +from prometheus_client import Counter, Info, Gauge + +class Metrics: + # -- < Measurement submission > ------------------------------------ + MSMNT_DISCARD_ASN0 = Counter( + "receive_measurement_discard_asn_0", + "How many measurements were discarded due to probe_asn == ASN0", + ) + + MSMNT_DISCARD_CC_ZZ = Counter( + "receive_measurement_discard_cc_zz", + "How many measurements were discarded due to probe_cc == ZZ", + ) + + MSMNT_RECEIVED_CNT = Counter( + "receive_measurement_count", + "Count of incomming measurements", + ) + + PROBE_CC_ASN_MATCH = Counter( + "probe_cc_asn_match", + "How many matches between reported and observed probe_cc and asn", + ) + + PROBE_CC_ASN_NO_MATCH = Counter( + "probe_cc_asn_nomatch", + "How many mismatches between reported and observed probe_cc and asn", + labelnames=["mismatch"], + ) + + MISSED_MSMNTS = Counter( + "missed_msmnts", "Measurements that failed to be sent to the fast path." + ) + + SEND_FASTPATH_FAILURE = Counter( + "measurement_fastpath_send_failure_count", + "How many times ooniprobe failed to send a measurement to fastpath", + ) + + SEND_S3_FAILURE = Counter( + "measurement_s3_upload_failure_count", + "How many times ooniprobe failed to send a measurement to s3. " + "Measurements are sent to s3 when they can't be sent to the fastpath", + ) + + # -- < Probe services > ---------------------------------------------------- + PROBE_LOGIN = Counter( + "probe_login_requests", + "Requests made to the probe login endpoint", + labelnames=["state", "detail", "login"], + ) + + PROBE_UPDATE_INFO = Info( + "probe_update_info", + "Information reported in the probe update endpoint", + ) + + CHECK_IN_TEST_LIST_COUNT = Gauge( + "check_in_test_list_count", "Amount of test lists present in each experiment" + ) + + GEOIP_ADDR_FOUND = Counter( + "geoip_ipaddr_found", + "If the ip address was found by geoip", + labelnames=["probe_cc", "asn"], + ) + + GEOIP_ADDR_NOT_FOUND = Counter( + "geoip_ipaddr_not_found", "We couldn't look up the IP address in the database" + ) + + GEOIP_CC_DIFFERS = Counter( + "geoip_cc_differs", "There's a mismatch between reported CC and observed CC" + ) + + GEOIP_ASN_DIFFERS = Counter( + "geoip_asn_differs", "There's a mismatch between reported ASN and observed ASN" + ) \ No newline at end of file diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/models.py b/ooniapi/services/ooniprobe/src/ooniprobe/models.py index 1cfc2e10..17457c5c 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/models.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/models.py @@ -1,10 +1,14 @@ from datetime import datetime -from typing import Dict +from typing import Self, Dict, Any from .common.models import UtcDateTime from .common.postgresql import Base -from sqlalchemy import ForeignKey, Sequence, String, Integer +from sqlalchemy import ForeignKey, Sequence, String, func from sqlalchemy.orm import Mapped -from sqlalchemy.orm import mapped_column, relationship +from sqlalchemy.orm import mapped_column, relationship, Session, joinedload +from sqlalchemy import desc +from ooniauth_py import ServerState +import logging +log = logging.getLogger(__name__) class OONIProbeVPNProvider(Base): @@ -49,3 +53,122 @@ class OONIProbeVPNProviderEndpoint(Base): provider_id = mapped_column(ForeignKey("ooniprobe_vpn_provider.id")) provider = relationship("OONIProbeVPNProvider", back_populates="endpoints") + + +class OONIProbeServerState(Base): + """ + Server state used for the anonymous credentials protocol. + Stores public parameters and secret key used for credential + generation and validation + """ + + __tablename__ = "ooniprobe_server_state" + + id: Mapped[str] = mapped_column( + String, + Sequence("ooniprobe_server_state_id_seq", start=1), + primary_key=True, + nullable=False, + ) + date_created: Mapped[datetime] = mapped_column(UtcDateTime(), default=func.now()) + secret_key: Mapped[str] = mapped_column() + public_parameters: Mapped[str] = mapped_column() + + @classmethod + def make_new_state(cls, session: Session, state : ServerState | None = None) -> Self: + """ + Creates a new state, saving it to db + If no state value is provided, create a default one with random values + """ + if state is None: + state = ServerState() + + new = cls( + secret_key=state.get_secret_key(), + public_parameters = state.get_public_parameters() + ) + session.add(new) + session.commit() + session.refresh(new) + + return new + + def to_protocol(self) -> ServerState: + """ + Converts this instance into a protocol object + """ + + return ServerState.from_creds(self.public_parameters, self.secret_key) + + @classmethod + def init_table(cls, session: Session): + """ + Creates a new entry if none exists. + """ + entry = session.query(cls).limit(1).one_or_none() + if entry is None: + log.info("No OONIProbeServerState entry found. Creating a new one...") + cls.make_new_state(session) + else: + log.info("OONIProbeServerState already initialized!") + + +class OONIProbeManifest(Base): + """ + Manifest used to share with clients the information they need to provide + registration and validation of measurement submissions + """ + + __tablename__ = "ooniprobe_manifest" + + version: Mapped[str] = mapped_column( + String, + Sequence("ooniprobe_manifest_id_seq", start=1), + primary_key=True, + nullable=False, + ) + date_created: Mapped[datetime] = mapped_column(UtcDateTime(), default=func.now()) + nym_scope: Mapped[str] = mapped_column(default="ooni.org/{probe_cc}/{probe_asn}") + submission_policy: Mapped[Dict[str, Any]] = mapped_column(default={}) + + server_state_id = mapped_column(ForeignKey("ooniprobe_server_state.id")) + server_state = relationship("OONIProbeServerState") + + @classmethod + def get_latest(cls, session: Session) -> Self | None: + return ( + session + .query(cls) + .options(joinedload(cls.server_state)) + .order_by(desc(cls.version)) + .limit(1) + .one_or_none() + ) + + @classmethod + def get_by_version(cls, session: Session, version: str) -> Self | None: + return ( + session + .query(cls) + .options(joinedload(cls.server_state)) + .where(cls.version == version) + .one_or_none() + ) + + @classmethod + def init_table(cls, session: Session): + """ + Creates a new manifest entry if none exists + """ + entry = session.query(cls).limit(1).one_or_none() + if entry is None: + log.info("No OONIProbeManifest entry found. Creating a new one...") + + # Make sure there's a server state + OONIProbeServerState.init_table(session) + state = session.query(OONIProbeServerState).order_by(desc(OONIProbeServerState.date_created)).limit(1).one() + entry = cls(server_state_id=state.id) + session.add(entry) + session.commit() + else: + log.info("OONIProbeServerState already initialized!") \ No newline at end of file diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py index f8c3e931..d575167a 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py @@ -1,25 +1,22 @@ -from typing import List, Annotated, Dict, Any +from typing import List, Dict, Any import asyncio -from pathlib import Path import logging from hashlib import sha512 -from urllib.request import urlopen from datetime import datetime, timezone import io import random -from fastapi import Request, Response, APIRouter, HTTPException, Header, Body +from fastapi import Request, Response, APIRouter, Header import httpx from pydantic import Field -from prometheus_client import Counter import zstd from ..utils import ( generate_report_id, - extract_probe_ipaddr, - lookup_probe_cc, - lookup_probe_network, + error, + compare_probe_msmt_cc_asn ) +from ..metrics import Metrics from ..dependencies import SettingsDep, ASNReaderDep, CCReaderDep, S3ClientDep from ..common.routers import BaseModel from ..common.utils import setnocacheresponse @@ -30,49 +27,6 @@ log = logging.getLogger(__name__) -class Metrics: - MSMNT_DISCARD_ASN0 = Counter( - "receive_measurement_discard_asn_0", - "How many measurements were discarded due to probe_asn == ASN0", - ) - - MSMNT_DISCARD_CC_ZZ = Counter( - "receive_measurement_discard_cc_zz", - "How many measurements were discarded due to probe_cc == ZZ", - ) - - MSMNT_RECEIVED_CNT = Counter( - "receive_measurement_count", - "Count of incomming measurements", - ) - - PROBE_CC_ASN_MATCH = Counter( - "probe_cc_asn_match", - "How many matches between reported and observed probe_cc and asn", - ) - - PROBE_CC_ASN_NO_MATCH = Counter( - "probe_cc_asn_nomatch", - "How many mismatches between reported and observed probe_cc and asn", - labelnames=["mismatch"], - ) - - MISSED_MSMNTS = Counter( - "missed_msmnts", "Measurements that failed to be sent to the fast path." - ) - - SEND_FASTPATH_FAILURE = Counter( - "measurement_fastpath_send_failure_count", - "How many times ooniprobe failed to send a measurement to fastpath", - ) - - SEND_S3_FAILURE = Counter( - "measurement_s3_upload_failure_count", - "How many times ooniprobe failed to send a measurement to s3. " - "Measurements are sent to s3 when they can't be sent to the fastpath", - ) - - class OpenReportRequest(BaseModel): """ Open report @@ -187,10 +141,11 @@ async def receive_measurement( data = await request.body() if content_encoding == "zstd": try: + compressed_len = len(data) data = zstd.decompress(data) - ratio = len(data) / len(data) + ratio = compressed_len / len(data) log.debug(f"Zstd compression ratio {ratio}") - except Exception as e: + except Exception: log.info("Failed zstd decompression") error("Incorrect format") @@ -213,7 +168,7 @@ async def receive_measurement( async with httpx.AsyncClient() as client: resp = await client.post(url, content=data, timeout=59) - + assert resp.status_code == 200, resp.content return ReceiveMeasurementResponse(measurement_uid=msmt_uid) @@ -247,34 +202,3 @@ def close_report(report_id): Close a report """ return {} - - -def error(msg: str, status_code: int = 400): - raise HTTPException(status_code=status_code, detail=msg) - - -def compare_probe_msmt_cc_asn( - cc: str, - asn: str, - request: Request, - cc_reader: CCReaderDep, - asn_reader: ASNReaderDep, -): - """Compares CC/ASN from measurement with CC/ASN from HTTPS connection ipaddr - Generates a metric. - """ - try: - cc = cc.upper() - ipaddr = extract_probe_ipaddr(request) - db_probe_cc = lookup_probe_cc(ipaddr, cc_reader) - db_asn, _ = lookup_probe_network(ipaddr, asn_reader) - if db_asn.startswith("AS"): - db_asn = db_asn[2:] - if db_probe_cc == cc and db_asn == asn: - Metrics.PROBE_CC_ASN_MATCH.inc() - elif db_probe_cc != cc: - Metrics.PROBE_CC_ASN_NO_MATCH.labels(mismatch="cc").inc() - elif db_asn != asn: - Metrics.PROBE_CC_ASN_NO_MATCH.labels(mismatch="asn").inc() - except Exception: - pass diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py index 8d4bc2e6..740b805d 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py @@ -1,27 +1,37 @@ import logging from datetime import datetime, timezone, timedelta import time -from typing import List, Optional, Any, Dict, Tuple, Optional +from typing import List, Any, Dict, Tuple, Optional import random +import ujson +from hashlib import sha512 +import asyncio +import io import geoip2 import geoip2.errors -from fastapi import APIRouter, Depends, HTTPException, Response, Request -from prometheus_client import Counter, Info, Gauge +from fastapi import APIRouter, Depends, HTTPException, Response, Request, status, Header from pydantic import Field +from ooniauth_py import ProtocolError, CredentialError, DeserializationFailed +import httpx from ...utils import ( generate_report_id, extract_probe_ipaddr, lookup_probe_cc, lookup_probe_network, + error, + compare_probe_msmt_cc_asn ) -from ...dependencies import CCReaderDep, ASNReaderDep, ClickhouseDep, SettingsDep + +from ...dependencies import CCReaderDep, ASNReaderDep, ClickhouseDep, SettingsDep, LatestManifestDep, PostgresSessionDep, S3ClientDep +from ..reports import Metrics from ...common.dependencies import get_settings from ...common.routers import BaseModel from ...common.auth import create_jwt, decode_jwt, jwt from ...common.config import Settings from ...common.utils import setnocacheresponse +from ...models import OONIProbeManifest, OONIProbeServerState from ...prio import generate_test_list router = APIRouter(prefix="/v1") @@ -29,41 +39,6 @@ log = logging.getLogger(__name__) -class Metrics: - PROBE_LOGIN = Counter( - "probe_login_requests", - "Requests made to the probe login endpoint", - labelnames=["state", "detail", "login"], - ) - - PROBE_UPDATE_INFO = Info( - "probe_update_info", - "Information reported in the probe update endpoint", - ) - - CHECK_IN_TEST_LIST_COUNT = Gauge( - "check_in_test_list_count", "Amount of test lists present in each experiment" - ) - - GEOIP_ADDR_FOUND = Counter( - "geoip_ipaddr_found", - "If the ip address was found by geoip", - labelnames=["probe_cc", "asn"], - ) - - GEOIP_ADDR_NOT_FOUND = Counter( - "geoip_ipaddr_not_found", "We couldn't look up the IP address in the database" - ) - - GEOIP_CC_DIFFERS = Counter( - "geoip_cc_differs", "There's a mismatch between reported CC and observed CC" - ) - - GEOIP_ASN_DIFFERS = Counter( - "geoip_asn_differs", "There's a mismatch between reported ASN and observed ASN" - ) - - class ProbeLogin(BaseModel): # Allow None username and password # to deliver informational 401 error when they're missing @@ -590,3 +565,234 @@ def random_web_test_helpers(th_list: List[str]) -> List[Dict]: for th_addr in th_list: out.append({"address": th_addr, "type": "https"}) return out + +# -- ------------------------------------ + +class ManifestResponse(BaseModel): + nym_scope: str + public_parameters: str + submission_policy: Dict[str, Any] + version: str + +@router.get("/manifest", tags=["anonymous_credentials"]) +def manifest(manifest : LatestManifestDep) -> ManifestResponse: + return ManifestResponse( + nym_scope="ooni.org/{probe_cc}/{probe_asn}", + public_parameters=manifest.server_state.public_parameters, + submission_policy={}, + version=manifest.version + ) + +class RegisterRequest(BaseModel): + manifest_version: str + credential_sign_request: str + +class RegisterResponse(BaseModel): + credential_sign_response: str + emission_day: int + +# TODO: choose a better name for this endpoint +@router.post("/sign_credential", tags=["anonymous_credentials"]) +def sign_credential(register_request: RegisterRequest, session : PostgresSessionDep): + + manifest = OONIProbeManifest.get_by_version(session, register_request.manifest_version) + if manifest is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + { + "error" : "manifest_not_found", + "message" : f"No manifest with version '{register_request.manifest_version}' was found"} + ) + state: OONIProbeServerState = manifest.server_state + protocol_state = state.to_protocol() + + try: + resp = protocol_state.handle_registration_request(register_request.credential_sign_request) + except (ProtocolError, CredentialError, DeserializationFailed) as e: + raise to_http_exception(e) + + return RegisterResponse( + credential_sign_response=resp, + emission_day=protocol_state.today() + ) + + +def to_http_exception(error: ProtocolError | CredentialError | DeserializationFailed): + + type_to_str = { + ProtocolError : "protocol_error", + DeserializationFailed : "deserialization_failed", + CredentialError : "credential_error" + } + type_str = type_to_str[type(error)] + + assert isinstance(error, (ProtocolError, CredentialError, DeserializationFailed)) + status_code = status.HTTP_400_BAD_REQUEST if isinstance(error, DeserializationFailed) else status.HTTP_403_FORBIDDEN + + return HTTPException( + status_code=status_code, + detail={ + "error" : type_str, + "message" : str(error) + } + ) + +class SubmitMeasurementRequest(BaseModel): + + format: str + content: Dict[str, Any] + # -- < Anonymous Credentials > ---------------------- + # not post quantum, in the future we might want to use a hashed key for storage + nym: str + zkp_request: str + age_range: Tuple[int,int] + msm_range: Tuple[int,int] + manifest_version: str + +class SubmitMeasurementResponse(BaseModel): + """ + Acknowledge + """ + + measurement_uid: str | None = Field( + examples=["20210208220710.181572_MA_ndt_7888edc7748936bf"], default=None + ) + is_verified: bool = Field( + description="if the ZKP was able to verify this request" + ) + submit_response: str | None = Field( + description="Anonymous credential verification response. Null if verification failed" + ) + +@router.post("/submit_measurement/{report_id}") +async def submit_measurement( + report_id: str, + request: Request, + submit_request: SubmitMeasurementRequest, + response: Response, + cc_reader: CCReaderDep, + asn_reader: ASNReaderDep, + settings: SettingsDep, + s3_client: S3ClientDep, + session: PostgresSessionDep, + content_encoding: str = Header(default=None), +) -> SubmitMeasurementResponse | Dict[str, Any]: + """ + Submit measurement + """ + setnocacheresponse(response) + empty_measurement = {} + try: + rid_timestamp, test_name, cc, asn, format_cid, rand = report_id.split("_") + except Exception: + log.info("Unexpected report_id %r", report_id[:200]) + raise error("Incorrect format") + + # TODO validate the timestamp? + good = len(cc) == 2 and test_name.isalnum() and 1 < len(test_name) < 30 + if not good: + log.info("Unexpected report_id %r", report_id[:200]) + error("Incorrect format") + + try: + asn_i = int(asn) + except ValueError: + log.info("ASN value not parsable %r", asn) + error("Incorrect format") + + if asn_i == 0: + log.info("Discarding ASN == 0") + Metrics.MSMNT_DISCARD_ASN0.inc() + return empty_measurement + + if cc.upper() == "ZZ": + log.info("Discarding CC == ZZ") + Metrics.MSMNT_DISCARD_CC_ZZ.inc() + return empty_measurement + + # Retrieve manifest known by the client + manifest = OONIProbeManifest.get_by_version(session, submit_request.manifest_version) + if manifest is None: + error({ + "detail" : f"No manifest with version '{submit_request.manifest_version}' was found", + "error" : "manifest_not_found" + }) + + # Run verification + assert manifest + assert "probe_cc" in submit_request.content and isinstance(submit_request.content['probe_cc'], str) + assert "probe_asn" in submit_request.content and isinstance(submit_request.content['probe_asn'], str) + + state: OONIProbeServerState = manifest.server_state + protocol_state = state.to_protocol() + + try: + submit_response = protocol_state.handle_submit_request( + submit_request.nym, + submit_request.zkp_request, + submit_request.content["probe_cc"], + submit_request.content["probe_asn"], + list(submit_request.age_range), + list(submit_request.msm_range), + ) + is_verified = True + except (DeserializationFailed, ProtocolError, CredentialError) as e: + # proof failed + # TODO Q: should we add a "why not verified" field to the measurement? + log.error(f"ZKP Failed: {e}") + is_verified = False + submit_response = None + + data = submit_request.model_dump() + + # Add verification-related data. + data['is_verified'] = is_verified + data_buff = io.BytesIO() + stream = io.TextIOWrapper(data_buff, "utf-8") + ujson.dump(data, stream) + data_bin = data_buff.getvalue() + + # Write the whole body of the measurement in a directory based on a 1-hour + # time window + now = datetime.now(timezone.utc) + h = sha512(data_bin).hexdigest()[:16] + ts = now.strftime("%Y%m%d%H%M%S.%f") + + # msmt_uid is a unique id based on upload time, cc, testname and hash + msmt_uid = f"{ts}_{cc}_{test_name}_{h}" + Metrics.MSMNT_RECEIVED_CNT.inc() + + compare_probe_msmt_cc_asn(cc, asn, request, cc_reader, asn_reader) + # Use exponential back off with jitter between retries + N_RETRIES = 3 + for t in range(N_RETRIES): + try: + url = f"{settings.fastpath_url}/{msmt_uid}" + + async with httpx.AsyncClient() as client: + resp = await client.post(url, content=data_bin, timeout=59) + + assert resp.status_code == 200, resp.content + return SubmitMeasurementResponse(measurement_uid=msmt_uid, is_verified=is_verified, submit_response=submit_response) + + except Exception as exc: + log.error( + f"[Try {t+1}/{N_RETRIES}] Error trying to send measurement to the fastpath ({settings.fastpath_url}). Error: {exc}" + ) + sleep_time = random.uniform(0, min(3, 0.3 * 2 ** t)) + await asyncio.sleep(sleep_time) + + Metrics.SEND_FASTPATH_FAILURE.inc() + + # wasn't possible to send msmnt to fastpath, try to send it to s3 + try: + s3_client.upload_fileobj( + data_buff, Bucket=settings.failed_reports_bucket, Key=report_id + ) + except Exception as exc: + log.error(f"Unable to upload measurement to s3. Error: {exc}") + Metrics.SEND_S3_FAILURE.inc() + + log.error(f"Unable to send report to fastpath. report_id: {report_id}") + Metrics.MISSED_MSMNTS.inc() + return empty_measurement diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/utils.py b/ooniapi/services/ooniprobe/src/ooniprobe/utils.py index 311c075b..a5b8dcbb 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/utils.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/utils.py @@ -9,13 +9,14 @@ from datetime import datetime, timezone import itertools import logging -from typing import Dict, List, Mapping, TypedDict, Tuple +from typing import Dict, List, TypedDict, Tuple, Any -from fastapi import Request +from fastapi import Request, HTTPException from sqlalchemy.orm import Session import pem import httpx +from .metrics import Metrics from .common.config import Settings from ooniprobe.models import OONIProbeVPNProvider, OONIProbeVPNProviderEndpoint from .dependencies import CCReaderDep, ASNReaderDep @@ -144,3 +145,34 @@ def lookup_probe_network(ipaddr: str, asn_reader: ASNReaderDep) -> Tuple[str, st "AS{}".format(resp.autonomous_system_number), resp.autonomous_system_organization or "0", ) + + +def error(msg: str | Dict[str, Any], status_code: int = 400): + raise HTTPException(status_code=status_code, detail=msg) + + +def compare_probe_msmt_cc_asn( + cc: str, + asn: str, + request: Request, + cc_reader: CCReaderDep, + asn_reader: ASNReaderDep, +): + """Compares CC/ASN from measurement with CC/ASN from HTTPS connection ipaddr + Generates a metric. + """ + try: + cc = cc.upper() + ipaddr = extract_probe_ipaddr(request) + db_probe_cc = lookup_probe_cc(ipaddr, cc_reader) + db_asn, _ = lookup_probe_network(ipaddr, asn_reader) + if db_asn.startswith("AS"): + db_asn = db_asn[2:] + if db_probe_cc == cc and db_asn == asn: + Metrics.PROBE_CC_ASN_MATCH.inc() + elif db_probe_cc != cc: + Metrics.PROBE_CC_ASN_NO_MATCH.labels(mismatch="cc").inc() + elif db_asn != asn: + Metrics.PROBE_CC_ASN_NO_MATCH.labels(mismatch="asn").inc() + except Exception: + pass \ No newline at end of file diff --git a/ooniapi/services/ooniprobe/tests/conftest.py b/ooniapi/services/ooniprobe/tests/conftest.py index f1543893..1b0c3804 100644 --- a/ooniapi/services/ooniprobe/tests/conftest.py +++ b/ooniapi/services/ooniprobe/tests/conftest.py @@ -1,4 +1,3 @@ -from tempfile import tempdir import pathlib from pathlib import Path import pytest @@ -16,6 +15,8 @@ from ooniprobe.dependencies import get_s3_client from ooniprobe.main import app from ooniprobe.download_geoip import try_update +from ooniprobe.dependencies import get_postgresql_session +from ooniprobe.models import OONIProbeManifest def make_override_get_settings(**kw): @@ -78,7 +79,7 @@ def client_with_bad_settings(): @pytest.fixture(scope="session") def fixture_path(): """ - Directory for this fixtures used to store temporary data, will be + Directory for this fixtures used to store temporary data, will be deleted after the tests are finished """ FIXTURE_PATH = Path(os.path.dirname(os.path.realpath(__file__))) / "data" @@ -100,6 +101,13 @@ def geoip_db_dir(fixture_path): def client(clickhouse_server, test_settings, geoip_db_dir): app.dependency_overrides[get_settings] = test_settings app.dependency_overrides[get_s3_client] = get_s3_client_mock + + # Initialize server state + db = get_postgresql_session(test_settings()) + session = next(db) + OONIProbeManifest.init_table(session) + next(db, None) + # lifespan won't run so do this here to have the DB try_update(geoip_db_dir) client = TestClient(app) @@ -169,9 +177,9 @@ def fastpath_server(docker_ip, docker_services): ) yield url -def is_fastpath_running(url: str) -> bool: - try: +def is_fastpath_running(url: str) -> bool: + try: resp = urlopen(url) return resp.status == 200 - except: + except Exception: return False \ No newline at end of file diff --git a/ooniapi/services/ooniprobe/tests/fakepath/main.py b/ooniapi/services/ooniprobe/tests/fakepath/main.py index 6ffe633e..8d7505c5 100644 --- a/ooniapi/services/ooniprobe/tests/fakepath/main.py +++ b/ooniapi/services/ooniprobe/tests/fakepath/main.py @@ -11,5 +11,8 @@ @app.get("/") def health(): - return + return +@app.post("/{msmt_uid}") +def receive_msm(): + return {} diff --git a/ooniapi/services/ooniprobe/tests/integ/test_reports.py b/ooniapi/services/ooniprobe/tests/integ/test_reports.py index 4abf3d09..01f3bd19 100644 --- a/ooniapi/services/ooniprobe/tests/integ/test_reports.py +++ b/ooniapi/services/ooniprobe/tests/integ/test_reports.py @@ -60,10 +60,10 @@ def test_collector_upload_msmt_valid(client): msmt = dict(test_keys={}) c = postj(client, f"/report/{rid}", {"format":"json", "content":msmt}) - assert c == {} + assert c['measurement_uid'].endswith("_IE_webconnectivity_e7889aeba0b36729"), c c = postj(client, f"/report/{rid}/close", json={}) - assert c == {} + assert c == {}, c def test_collector_upload_msmt_valid_zstd(client): rid = "20230101T000000Z_integtest_IT_1_n1_integtest0000000" @@ -71,4 +71,5 @@ def test_collector_upload_msmt_valid_zstd(client): zmsmt = zstd.compress(msmt) headers = [("Content-Encoding", "zstd")] c = post(client, f"/report/{rid}", zmsmt, headers=headers) - assert c == {} + assert len(c) == 1 + assert c['measurement_uid'].endswith("_IT_integtest_50be3cd5406bca65"), c diff --git a/ooniapi/services/ooniprobe/tests/test_anoncred.py b/ooniapi/services/ooniprobe/tests/test_anoncred.py new file mode 100644 index 00000000..6247ab26 --- /dev/null +++ b/ooniapi/services/ooniprobe/tests/test_anoncred.py @@ -0,0 +1,143 @@ +from typing import Any, Dict, Tuple +from httpx import Client +from fastapi import status +from ooniprobe.models import OONIProbeServerState, OONIProbeManifest +from ooniauth_py import UserState, ServerState + +def getj(client : Client, url: str, params: Dict[str, Any] = {}) -> Dict[str, Any]: + resp = client.get(url) + assert resp.status_code == status.HTTP_200_OK, f"Unexpected status code: {resp.status_code} - {url}. {resp.content}" + return resp.json() + +def postj(client : Client, url: str, json: Dict[str, Any] | None = None, headers: Dict[str, Any] | None = None) -> Dict[str, Any]: + resp = client.post(url, json=json, headers=headers) + assert resp.status_code == status.HTTP_200_OK, f"Unexpected status code: {resp.status_code} - {url}. {resp.content}" + return resp.json() + +def post(client, url, data, headers=None): + response = client.post(url, data=data, headers=headers) + assert response.status_code == 200 + return response.json() + +def test_manifest_basic(client, db): + latest = db.query(OONIProbeServerState).limit(1).one_or_none() + manifest = OONIProbeManifest.get_latest(db) + assert latest is not None, "Server state not initialized" + assert manifest is not None, "Manifest not initialized" + + m = getj(client, "/api/v1/manifest") + + assert latest.public_parameters == m['public_parameters'] + assert manifest.version == m['version'] + +def test_registration_basic(client): + + manifest = getj(client, "/api/v1/manifest") + + user_state = UserState(manifest['public_parameters']) + sign_req = user_state.make_registration_request() + resp = postj( + client, + "/api/v1/sign_credential", + { + "credential_sign_request" : sign_req, + "manifest_version" : manifest['version'] + } + ) + # should be able to verify this credential + user_state.handle_registration_response(resp['credential_sign_response']) # should not crash + +def test_registration_errors(client): + + bad_version = "999" + resp = client.post("/api/v1/sign_credential", + json={ + "credential_sign_request" : "doesntmatter", + "manifest_version" : bad_version + } + ) + # Bad manifest date should raise 404 + assert resp.status_code == 404, resp.content + j = resp.json() + assert 'error' in j['detail'] and 'message' in j['detail'], j + assert j['detail']['error'] == "manifest_not_found" + + # Not using the right public params should not verify + manifest = getj(client, "/api/v1/manifest") + bad_server = ServerState() + user = UserState(bad_server.get_public_parameters()) + resp = client.post("/api/v1/sign_credential", json={ + "credential_sign_request" : user.make_registration_request(), + "manifest_version" : manifest['version'] + }) + + assert resp.status_code == status.HTTP_403_FORBIDDEN, resp.content + j = resp.json() + assert j['detail']['error'] == 'protocol_error' + + # Changing random characters should mess with the serialization + user = UserState(manifest['public_parameters']) + sign_req = user.make_registration_request() + bad = "bad" + assert len(sign_req) >= len(bad), sign_req + sign_req = bad + sign_req[len(bad):] + resp = client.post("/api/v1/sign_credential", json={ + "credential_sign_request" : sign_req, + "manifest_version" : manifest['version'] + }) + + assert resp.status_code == status.HTTP_400_BAD_REQUEST, resp.content + j = resp.json() + assert j['detail']['error'] == 'deserialization_failed', j + +def test_submission_basic(client): + # open report + j = { + "data_format_version": "0.2.0", + "format": "json", + "probe_asn": "AS34245", + "probe_cc": "IE", + "software_name": "miniooni", + "software_version": "0.17.0-beta", + "test_name": "web_connectivity", + "test_start_time": "2020-09-09 14:11:11", + "test_version": "0.1.0", + } + resp = postj(client, "/report", json=j) + rid = resp.pop("report_id") + + # Create user + user, manifest_version, emission_day = setup_user(client) + + submit_request = user.make_submit_request("IE", "AS34245", emission_day) + msm = { + "format": "json", + "content": { + "test_name": "web_connectivity", + "probe_asn": "AS34245", + "probe_cc": "IE", + "test_start_time": "2020-09-09 14:11:11", + }, + "nym": submit_request.nym, + "zkp_request": submit_request.request, + "age_range": [emission_day - 30, emission_day + 1], + "msm_range": [0, 100], + "manifest_version": manifest_version + } + c = postj(client, f"/api/v1/submit_measurement/{rid}", msm) + assert c['is_verified'] == True # noqa: E712 + + assert c['submit_response'], "Submit response should not be null if the proof was verified" + user.handle_submit_response(c['submit_response']) + +def setup_user(client) -> Tuple[UserState, str, int]: # user, manifest version, emission day + manifest = getj(client, "/api/v1/manifest") + user = UserState(manifest['public_parameters']) + req = user.make_registration_request() + resp = postj(client, "/api/v1/sign_credential", json = { + "credential_sign_request" : req, + "manifest_version" : manifest['version'] + }) + user.handle_registration_response(resp['credential_sign_response']) + + return (user, manifest['version'], resp['emission_day']) \ No newline at end of file