Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions docker/granular/granular.py
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ class NewBackup(flask_restful.Resource):
def __init__(self):
self.log = logging.getLogger("NewBackup")
self.allowed_fields = ["storageName", "blobPath", "databases"]
self.s3 = storage_s3.AwsS3Vault() if os.environ.get("STORAGE_TYPE") == "s3" else None
self.s3 = storage_s3.AwsS3Vault(prefix="")

@staticmethod
def get_endpoints():
Expand Down Expand Up @@ -1142,7 +1142,7 @@ class NewBackupStatus(flask_restful.Resource):

def __init__(self):
self.log = logging.getLogger("NewBackupStatus")
self.s3 = storage_s3.AwsS3Vault() if os.environ.get("STORAGE_TYPE") == "s3" else None
self.s3 = storage_s3.AwsS3Vault(prefix="")

@staticmethod
def get_endpoints():
Expand Down Expand Up @@ -1273,7 +1273,7 @@ class NewRestore(flask_restful.Resource):

def __init__(self):
self.log = logging.getLogger("NewRestore")
self.s3 = storage_s3.AwsS3Vault() if os.environ.get("STORAGE_TYPE") == "s3" else None
self.s3 = storage_s3.AwsS3Vault(prefix="")

@staticmethod
def get_endpoints():
Expand Down Expand Up @@ -1382,10 +1382,13 @@ def post(self, backup_id):
for prev in (requested or []):
prev_name = prev or ""
restored_as = databases_mapping.get(prev_name) if isinstance(databases_mapping, dict) else None
status = "notStarted" if prev_name else ""
if dry_run:
status = "Successful"
dbs_out.append({
"previousDatabaseName": prev_name,
"databaseName": restored_as or prev_name,
"status": "notStarted" if prev_name else "",
"status": status,
"creationTime": created_iso
})

Expand Down Expand Up @@ -1416,7 +1419,8 @@ def post(self, backup_id):
}

if dry_run:
return enriched, http.client.OK
enriched["status"] = "Successful"
enriched, http.client.OK

status_path = backups.build_restore_status_file_path(backup_id, tracking_id, namespace)

Expand All @@ -1440,7 +1444,7 @@ class NewRestoreStatus(flask_restful.Resource):

def __init__(self):
self.log = logging.getLogger("NewRestoreStatus")
self.s3 = storage_s3.AwsS3Vault() if os.environ.get("STORAGE_TYPE") == "s3" else None
self.s3 = storage_s3.AwsS3Vault(prefix="")

@staticmethod
def get_endpoints():
Expand Down
6 changes: 5 additions & 1 deletion docker/granular/pg_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ def __init__(self, databases, backup_request, blob_path=None):
self.blob_path = blob_path
self.backup_dir = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root)
self.create_backup_dir()
self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None
if blob_path:
self.s3 = storage_s3.AwsS3Vault(prefix="")
else:
self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None

self._cancel_event = Event()
if configs.get_encryption():
self.encryption = True
Expand Down
9 changes: 6 additions & 3 deletions docker/granular/pg_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ def __init__(self, databases, force, restore_request, databases_mapping, owners_
self.owners_mapping = owners_mapping
self.bin_path = configs.get_pgsql_bin_path(self.postgres_version)
self.parallel_jobs = configs.get_parallel_jobs()
self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None
if blobPath:
self.s3 = storage_s3.AwsS3Vault(prefix="")
else:
self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None
self.blob_path = blobPath
self.backup_dir = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root)
self.create_backup_dir(self.backup_dir)
Expand Down Expand Up @@ -117,7 +120,6 @@ def update_status(self, key, value, database=None, flush=False):
database_details = {}
if self.databases_mapping and self.databases_mapping[database]:
database_details['newDatabaseName'] = self.databases_mapping[database]
databases_section[database] = database_details

database_details[key] = value
databases_section[database] = database_details
Expand Down Expand Up @@ -378,6 +380,7 @@ def restore_single_database(self, database):
if "ROLLBACK" in line:
self.log.error("ROLLBACK")
raise backups.RestoreFailedException(database, "ROLLBACK")
self.update_status('duration', int(time.time() - db_start), database=database, flush=True)
self.log.info(self.log_msg("Successful restored"))
self.pg_restore_proc = None
try:
Expand Down Expand Up @@ -484,7 +487,7 @@ def restore_single_database(self, database):
with open(stderr_path, 'r') as f:
raise backups.RestoreFailedException(database, '\n'.join(f.readlines()))
else:
self.update_status('duration', int(time.time() - db_start), database=database)
self.update_status('duration', int(time.time() - db_start), database=database, flush=True)

if database != new_bd_name:
self.log.info(self.log_msg("Database '%s' has been successfully restored with new name '%s'." %
Expand Down
43 changes: 27 additions & 16 deletions docker/granular/storage_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@ class AwsS3Vault:
__log = logging.getLogger("AwsS3Granular")

def __init__(self, cluster_name=None, cache_enabled=False,
aws_s3_bucket_listing=None):
aws_s3_bucket_listing=None, prefix=None):

self.bucket = bucket or os.getenv("CONTAINER") or os.getenv("AWS_S3_BUCKET") or os.getenv("S3_BUCKET")
self.console = None
self.cluster_name = cluster_name
self.cache_enabled = cache_enabled
self.cached_state = {}
self.aws_s3_bucket_listing = aws_s3_bucket_listing
self.aws_prefix = os.getenv("AWS_S3_PREFIX", "")
if prefix is not None:
self.aws_prefix = prefix
else:
self.aws_prefix = os.getenv("AWS_S3_PREFIX", "")

if not self.bucket or not isinstance(self.bucket, str) or not self.bucket.strip():
raise ValueError("S3 bucket is not configured. Set one of CONTAINER, AWS_S3_BUCKET, or S3_BUCKET.")
Expand All @@ -64,31 +67,31 @@ def get_s3_client(self):
def upload_file(self, file_path, blob_path=None, backup_id=None):
if blob_path:
file_name = file_path.rsplit('/',1)[1]
s3FilePath = self.aws_prefix + f'{blob_path}/{backup_id}/{file_name}'
s3FilePath = self.get_prefixed_path(f'{blob_path}/{backup_id}/{file_name}')
return self.get_s3_client().upload_file(file_path, self.bucket, s3FilePath)
else:
return self.get_s3_client().upload_file(file_path, self.bucket, self.aws_prefix + file_path)
return self.get_s3_client().upload_file(file_path, self.bucket, self.get_prefixed_path(file_path))

@retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT)
def delete_file(self, filename):
return self.get_s3_client().delete_object(Bucket=self.bucket, Key=self.aws_prefix + filename)
return self.get_s3_client().delete_object(Bucket=self.bucket, Key=self.get_prefixed_path(filename))

@retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT)
def delete_objects(self, filename):
objects_to_delete = self.get_s3_client().list_objects(Bucket=self.bucket, Prefix=self.aws_prefix + filename)
objects_to_delete = self.get_s3_client().list_objects(Bucket=self.bucket, Prefix=self.get_prefixed_path(filename))
for obj in objects_to_delete.get('Contents', []):
self.get_s3_client().delete_object(Bucket=self.bucket, Key=obj['Key'])

@retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT)
def read_object(self, file_path):
self.__log.info("Reading object %s" % self.aws_prefix + file_path)
obj = self.get_s3_client().get_object(Bucket=self.bucket, Key=self.aws_prefix + file_path)
self.__log.info("Reading object %s" % self.get_prefixed_path(file_path))
obj = self.get_s3_client().get_object(Bucket=self.bucket, Key=self.get_prefixed_path(file_path))
# self.__log.info(obj['Body'].read().decode('utf8'))
return obj['Body'].read().decode('utf8')

@retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT)
def get_file_size(self, file_path):
obj = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.aws_prefix + file_path)
obj = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.get_prefixed_path(file_path))
if 'Contents' in obj:
for field in obj["Contents"]:
return field["Size"]
Expand All @@ -103,10 +106,10 @@ def download_file(self, filename, backup_id, blob_path=None):
file_path = f'{blob_path}/{backup_id}/{only_file_name}'
logging.info(f'Downloading file {file_path} to {filename}')

self.get_s3_client().download_file(self.bucket, self.aws_prefix + file_path, filename)
self.get_s3_client().download_file(self.bucket, self.get_prefixed_path(file_path), filename)
else:
logging.info("Downloading file {}" .format(self.aws_prefix + filename))
self.get_s3_client().download_file(self.bucket, self.aws_prefix + filename, filename)
logging.info("Downloading file {}" .format(self.get_prefixed_path(filename)))
self.get_s3_client().download_file(self.bucket, self.get_prefixed_path(filename), filename)
except Exception as e:
raise e
return
Expand All @@ -115,20 +118,20 @@ def download_file(self, filename, backup_id, blob_path=None):
def is_file_exists(self, file):
exists = True
try:
self.get_s3_client().head_object(Bucket=self.bucket, Key=self.aws_prefix + file)
self.get_s3_client().head_object(Bucket=self.bucket, Key=self.get_prefixed_path(file))
except botocore.exceptions.ClientError as e:
exists = False
return exists

def is_s3_storage_path_exist(self, storage):
bucket = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.aws_prefix + storage)
bucket = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.get_prefixed_path(storage))
if 'Contents' in bucket:
s3_storage_path = bucket['Contents'][0]["Key"]
return True if storage in s3_storage_path else False
return False

def get_granular_namespaces(self, storage):
bucket = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.aws_prefix + storage)
bucket = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=self.get_prefixed_path(storage))
namespaces = []
if 'Contents' in bucket:
for obj in bucket["Contents"]:
Expand All @@ -141,7 +144,7 @@ def get_granular_namespaces(self, storage):
return namespaces

def get_backup_ids(self, storage, namespace):
namespaced_path = self.aws_prefix + storage + "/" + namespace
namespaced_path = self.get_prefixed_path(storage + "/" + namespace)
bucket = self.get_s3_client().list_objects_v2(Bucket=self.bucket, Prefix=namespaced_path)
backup_ids = []
if 'Contents' in bucket:
Expand All @@ -153,3 +156,11 @@ def get_backup_ids(self, storage, namespace):
else:
pass
return backup_ids

def get_prefixed_path(self, path):
if self.aws_prefix:
return self.aws_prefix + path
else:
if path.startswith("/"):
return path[1:]
return path