diff --git a/docker/granular/granular.py b/docker/granular/granular.py index b01cbc5..a57e4c8 100644 --- a/docker/granular/granular.py +++ b/docker/granular/granular.py @@ -1064,7 +1064,7 @@ class NewBackup(flask_restful.Resource): def __init__(self): self.log = logging.getLogger("NewBackup") self.allowed_fields = ["storageName", "blobPath", "databases"] - self.s3 = storage_s3.AwsS3Vault() if os.environ.get("STORAGE_TYPE") == "s3" else None + self.s3 = storage_s3.AwsS3Vault(prefix="") @staticmethod def get_endpoints(): @@ -1142,7 +1142,7 @@ class NewBackupStatus(flask_restful.Resource): def __init__(self): self.log = logging.getLogger("NewBackupStatus") - self.s3 = storage_s3.AwsS3Vault() if os.environ.get("STORAGE_TYPE") == "s3" else None + self.s3 = storage_s3.AwsS3Vault(prefix="") @staticmethod def get_endpoints(): @@ -1273,7 +1273,7 @@ class NewRestore(flask_restful.Resource): def __init__(self): self.log = logging.getLogger("NewRestore") - self.s3 = storage_s3.AwsS3Vault() if os.environ.get("STORAGE_TYPE") == "s3" else None + self.s3 = storage_s3.AwsS3Vault(prefix="") @staticmethod def get_endpoints(): @@ -1382,10 +1382,13 @@ def post(self, backup_id): for prev in (requested or []): prev_name = prev or "" restored_as = databases_mapping.get(prev_name) if isinstance(databases_mapping, dict) else None + status = "notStarted" if prev_name else "" + if dry_run: + status = "Successful" dbs_out.append({ "previousDatabaseName": prev_name, "databaseName": restored_as or prev_name, - "status": "notStarted" if prev_name else "", + "status": status, "creationTime": created_iso }) @@ -1416,7 +1419,8 @@ def post(self, backup_id): } if dry_run: - return enriched, http.client.OK + enriched["status"] = "Successful" + enriched, http.client.OK status_path = backups.build_restore_status_file_path(backup_id, tracking_id, namespace) @@ -1440,7 +1444,7 @@ class NewRestoreStatus(flask_restful.Resource): def __init__(self): self.log = logging.getLogger("NewRestoreStatus") - self.s3 = storage_s3.AwsS3Vault() if os.environ.get("STORAGE_TYPE") == "s3" else None + self.s3 = storage_s3.AwsS3Vault(prefix="") @staticmethod def get_endpoints(): diff --git a/docker/granular/pg_backup.py b/docker/granular/pg_backup.py index d5ccb79..d51900b 100644 --- a/docker/granular/pg_backup.py +++ b/docker/granular/pg_backup.py @@ -53,7 +53,11 @@ def __init__(self, databases, backup_request, blob_path=None): self.blob_path = blob_path self.backup_dir = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root) self.create_backup_dir() - self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None + if blob_path: + self.s3 = storage_s3.AwsS3Vault(prefix="") + else: + self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None + self._cancel_event = Event() if configs.get_encryption(): self.encryption = True diff --git a/docker/granular/pg_restore.py b/docker/granular/pg_restore.py index 9743bce..c3a2a4f 100644 --- a/docker/granular/pg_restore.py +++ b/docker/granular/pg_restore.py @@ -63,7 +63,10 @@ def __init__(self, databases, force, restore_request, databases_mapping, owners_ self.owners_mapping = owners_mapping self.bin_path = configs.get_pgsql_bin_path(self.postgres_version) self.parallel_jobs = configs.get_parallel_jobs() - self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None + if blobPath: + self.s3 = storage_s3.AwsS3Vault(prefix="") + else: + self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None self.blob_path = blobPath self.backup_dir = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root) self.create_backup_dir(self.backup_dir) @@ -117,7 +120,6 @@ def update_status(self, key, value, database=None, flush=False): database_details = {} if self.databases_mapping and self.databases_mapping[database]: database_details['newDatabaseName'] = self.databases_mapping[database] - databases_section[database] = database_details database_details[key] = value databases_section[database] = database_details @@ -378,6 +380,7 @@ def restore_single_database(self, database): if "ROLLBACK" in line: self.log.error("ROLLBACK") raise backups.RestoreFailedException(database, "ROLLBACK") + self.update_status('duration', int(time.time() - db_start), database=database, flush=True) self.log.info(self.log_msg("Successful restored")) self.pg_restore_proc = None try: @@ -484,7 +487,7 @@ def restore_single_database(self, database): with open(stderr_path, 'r') as f: raise backups.RestoreFailedException(database, '\n'.join(f.readlines())) else: - self.update_status('duration', int(time.time() - db_start), database=database) + self.update_status('duration', int(time.time() - db_start), database=database, flush=True) if database != new_bd_name: self.log.info(self.log_msg("Database '%s' has been successfully restored with new name '%s'." % diff --git a/docker/granular/storage_s3.py b/docker/granular/storage_s3.py index e273b68..4876949 100644 --- a/docker/granular/storage_s3.py +++ b/docker/granular/storage_s3.py @@ -39,7 +39,7 @@ class AwsS3Vault: __log = logging.getLogger("AwsS3Granular") def __init__(self, cluster_name=None, cache_enabled=False, - aws_s3_bucket_listing=None): + aws_s3_bucket_listing=None, prefix=None): self.bucket = bucket or os.getenv("CONTAINER") or os.getenv("AWS_S3_BUCKET") or os.getenv("S3_BUCKET") self.console = None @@ -47,7 +47,10 @@ def __init__(self, cluster_name=None, cache_enabled=False, self.cache_enabled = cache_enabled self.cached_state = {} self.aws_s3_bucket_listing = aws_s3_bucket_listing - self.aws_prefix = os.getenv("AWS_S3_PREFIX", "") + if prefix is not None: + self.aws_prefix = prefix + else: + self.aws_prefix = os.getenv("AWS_S3_PREFIX", "") if not self.bucket or not isinstance(self.bucket, str) or not self.bucket.strip(): raise ValueError("S3 bucket is not configured. Set one of CONTAINER, AWS_S3_BUCKET, or S3_BUCKET.") @@ -64,31 +67,31 @@ def get_s3_client(self): def upload_file(self, file_path, blob_path=None, backup_id=None): if blob_path: file_name = file_path.rsplit('/',1)[1] - s3FilePath = self.aws_prefix + f'{blob_path}/{backup_id}/{file_name}' + s3FilePath = self.get_prefixed_path(f'{blob_path}/{backup_id}/{file_name}') return self.get_s3_client().upload_file(file_path, self.bucket, s3FilePath) else: - return self.get_s3_client().upload_file(file_path, self.bucket, self.aws_prefix + file_path) + return self.get_s3_client().upload_file(file_path, self.bucket, self.get_prefixed_path(file_path)) @retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT) def delete_file(self, filename): - return self.get_s3_client().delete_object(Bucket=self.bucket, Key=self.aws_prefix + filename) + return self.get_s3_client().delete_object(Bucket=self.bucket, Key=self.get_prefixed_path(filename)) @retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT) def delete_objects(self, filename): - objects_to_delete = self.get_s3_client().list_objects(Bucket=self.bucket, Prefix=self.aws_prefix + filename) + objects_to_delete = self.get_s3_client().list_objects(Bucket=self.bucket, Prefix=self.get_prefixed_path(filename)) for obj in objects_to_delete.get('Contents', []): self.get_s3_client().delete_object(Bucket=self.bucket, Key=obj['Key']) @retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT) def read_object(self, file_path): - self.__log.info("Reading object %s" % self.aws_prefix + file_path) - obj = self.get_s3_client().get_object(Bucket=self.bucket, Key=self.aws_prefix + file_path) + self.__log.info("Reading object %s" % self.get_prefixed_path(file_path)) + obj = self.get_s3_client().get_object(Bucket=self.bucket, Key=self.get_prefixed_path(file_path)) # self.__log.info(obj['Body'].read().decode('utf8')) return obj['Body'].read().decode('utf8') @retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT) def get_file_size(self, file_path): - obj = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.aws_prefix + file_path) + obj = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.get_prefixed_path(file_path)) if 'Contents' in obj: for field in obj["Contents"]: return field["Size"] @@ -103,10 +106,10 @@ def download_file(self, filename, backup_id, blob_path=None): file_path = f'{blob_path}/{backup_id}/{only_file_name}' logging.info(f'Downloading file {file_path} to {filename}') - self.get_s3_client().download_file(self.bucket, self.aws_prefix + file_path, filename) + self.get_s3_client().download_file(self.bucket, self.get_prefixed_path(file_path), filename) else: - logging.info("Downloading file {}" .format(self.aws_prefix + filename)) - self.get_s3_client().download_file(self.bucket, self.aws_prefix + filename, filename) + logging.info("Downloading file {}" .format(self.get_prefixed_path(filename))) + self.get_s3_client().download_file(self.bucket, self.get_prefixed_path(filename), filename) except Exception as e: raise e return @@ -115,20 +118,20 @@ def download_file(self, filename, backup_id, blob_path=None): def is_file_exists(self, file): exists = True try: - self.get_s3_client().head_object(Bucket=self.bucket, Key=self.aws_prefix + file) + self.get_s3_client().head_object(Bucket=self.bucket, Key=self.get_prefixed_path(file)) except botocore.exceptions.ClientError as e: exists = False return exists def is_s3_storage_path_exist(self, storage): - bucket = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.aws_prefix + storage) + bucket = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.get_prefixed_path(storage)) if 'Contents' in bucket: s3_storage_path = bucket['Contents'][0]["Key"] return True if storage in s3_storage_path else False return False def get_granular_namespaces(self, storage): - bucket = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.aws_prefix + storage) + bucket = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.get_prefixed_path(storage)) namespaces = [] if 'Contents' in bucket: for obj in bucket["Contents"]: @@ -141,7 +144,7 @@ def get_granular_namespaces(self, storage): return namespaces def get_backup_ids(self, storage, namespace): - namespaced_path = self.aws_prefix + storage + "/" + namespace + namespaced_path = self.get_prefixed_path(storage + "/" + namespace) bucket = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=namespaced_path) backup_ids = [] if 'Contents' in bucket: @@ -153,3 +156,11 @@ def get_backup_ids(self, storage, namespace): else: pass return backup_ids + + def get_prefixed_path(self, path): + if self.aws_prefix: + return self.aws_prefix + path + else: + if path.startswith("/"): + return path[1:] + return path \ No newline at end of file