diff --git a/.gitignore b/.gitignore index 294bfde..bb2367d 100644 --- a/.gitignore +++ b/.gitignore @@ -164,4 +164,15 @@ dmypy.json Makefile # debugging files -scorer_demo.py \ No newline at end of file +scorer_demo.py +models/ViT-L-14.pt +models/ImageReward.pt +models/med_config.json +models/HPS_v2_compressed.pt +models/pytorch_model.bin +TEST.py +/mscoco/ +/mscoco.parquet +models/CLIP-ViT-L-14.pt +models/HPS_v2.1.pt +models/Real.pt diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..3a91067 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..14115d6 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/sd-webui-bayesian-merger.iml b/.idea/sd-webui-bayesian-merger.iml new file mode 100644 index 0000000..da7efed --- /dev/null +++ b/.idea/sd-webui-bayesian-merger.iml @@ -0,0 +1,14 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/conf/config.tmpl.yaml b/conf/config.tmpl.yaml index f8dcaf7..5474f9f 100644 --- a/conf/config.tmpl.yaml +++ b/conf/config.tmpl.yaml @@ -14,7 +14,6 @@ work_device: cpu threads: 1 wildcards_dir: path/to/wildcards/folder -scorer_model_dir: path/to/scorer/models/folder model_a: path/to/model_a/file model_b: path/to/model_b/file @@ -32,11 +31,44 @@ guided_optimisation: False batch_size: 1 init_points: 1 n_iters: 1 +img_average_type: arithmetic # geometric, arithmetic, quadratic save_imgs: False -scorer_device: cpu # cuda -scorer_method: chad # chad, laion, manual +# scorer by type: +# Prompt-Image Alignment: blip, clip +# Aesthetic: chad, laion +# Hybrid(PIA + AES): ir, hpsv2, pick +# Anime/Illustration: shadow, cafe, wdaes +# Misc: manual, noai, iqa +# +# !!!! IQA ARE NOT IMPLEMENTED YET !!!! +# +# Notes: +# 1) recomended tested safe setup is [laion, chad, clip, blip, ir] with weights 0.5, 0.5, 1, 1, 1 + +scorer_method: [clip, blip, laion, chad, ir] +scorer_average_type: arithmetic # geometric, arithmetic, quadratic +scorer_weight: + #blip: 0.5 + #chad: 2 + # example above, default is 1 +scorer_default_device: cpu # cuda +scorer_device: + #blip: cpu + #chad: cuda + # example above, default is scorer default device +scorer_model_dir: path/to/scorer/models/folder +scorer_alt_location: + #blip: + #model_name: scorer.pth + #model_dir: path/to/scorer/scorer.pth + #chad: + #model_name: scorer.pt + #model_dir: path/to/scorer/scorer.pt + # example above, default downloads them in the scorer_model_dir(this option is here if you already have them downloaded somewhere else) +scorer_print_individual: False + save_best: False best_format: safetensors # ckpt diff --git a/install.py b/install.py index 91e466e..5961736 100644 --- a/install.py +++ b/install.py @@ -8,7 +8,6 @@ with open(Path(extension_dir, "requirements.txt"), "r", encoding="utf-8") as f: reqs = f.readlines() - print(reqs) for req in reqs: req = req.strip() diff --git a/requirements.txt b/requirements.txt index d0744c6..d3e5674 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,6 @@ sd-meh==0.9.5 lightgbm scikit-learn openai-clip +tensordict +timm +fairscale diff --git a/sd_webui_bayesian_merger/models/BLIP/__init__.py b/sd_webui_bayesian_merger/models/BLIP/__init__.py new file mode 100644 index 0000000..0a617e7 --- /dev/null +++ b/sd_webui_bayesian_merger/models/BLIP/__init__.py @@ -0,0 +1 @@ +from .blip_pretrain import * \ No newline at end of file diff --git a/sd_webui_bayesian_merger/models/BLIP/blip.py b/sd_webui_bayesian_merger/models/BLIP/blip.py new file mode 100644 index 0000000..0dfdb72 --- /dev/null +++ b/sd_webui_bayesian_merger/models/BLIP/blip.py @@ -0,0 +1,70 @@ +''' + * Adapted from BLIP (https://github.com/salesforce/BLIP) +''' + +import warnings +warnings.filterwarnings("ignore") + +import torch +import os +from urllib.parse import urlparse +from timm.models.hub import download_cached_file +from transformers import BertTokenizer +from .vit import VisionTransformer, interpolate_pos_embed + + +def init_tokenizer(): + tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') + tokenizer.add_special_tokens({'bos_token':'[DEC]'}) + tokenizer.add_special_tokens({'additional_special_tokens':['[ENC]']}) + tokenizer.enc_token_id = tokenizer.additional_special_tokens_ids[0] + return tokenizer + + +def create_vit(vit, image_size, use_grad_checkpointing=False, ckpt_layer=0, drop_path_rate=0): + + assert vit in ['base', 'large'], "vit parameter must be base or large" + if vit=='base': + vision_width = 768 + visual_encoder = VisionTransformer(img_size=image_size, patch_size=16, embed_dim=vision_width, depth=12, + num_heads=12, use_grad_checkpointing=use_grad_checkpointing, ckpt_layer=ckpt_layer, + drop_path_rate=0 or drop_path_rate + ) + elif vit=='large': + vision_width = 1024 + visual_encoder = VisionTransformer(img_size=image_size, patch_size=16, embed_dim=vision_width, depth=24, + num_heads=16, use_grad_checkpointing=use_grad_checkpointing, ckpt_layer=ckpt_layer, + drop_path_rate=0.1 or drop_path_rate + ) + return visual_encoder, vision_width + + +def is_url(url_or_filename): + parsed = urlparse(url_or_filename) + return parsed.scheme in ("http", "https") + +def load_checkpoint(model,url_or_filename): + if is_url(url_or_filename): + cached_file = download_cached_file(url_or_filename, check_hash=False, progress=True) + checkpoint = torch.load(cached_file, map_location='cpu') + elif os.path.isfile(url_or_filename): + checkpoint = torch.load(url_or_filename, map_location='cpu') + else: + raise RuntimeError('checkpoint url or path is invalid') + + state_dict = checkpoint['model'] + + state_dict['visual_encoder.pos_embed'] = interpolate_pos_embed(state_dict['visual_encoder.pos_embed'],model.visual_encoder) + if 'visual_encoder_m.pos_embed' in model.state_dict().keys(): + state_dict['visual_encoder_m.pos_embed'] = interpolate_pos_embed(state_dict['visual_encoder_m.pos_embed'], + model.visual_encoder_m) + for key in model.state_dict().keys(): + if key in state_dict.keys(): + if state_dict[key].shape!=model.state_dict()[key].shape: + print(key, ": ", state_dict[key].shape, ', ', model.state_dict()[key].shape) + del state_dict[key] + + msg = model.load_state_dict(state_dict,strict=False) + print('load checkpoint from %s'%url_or_filename) + return model,msg + diff --git a/sd_webui_bayesian_merger/models/BLIP/blip_pretrain.py b/sd_webui_bayesian_merger/models/BLIP/blip_pretrain.py new file mode 100644 index 0000000..793cb07 --- /dev/null +++ b/sd_webui_bayesian_merger/models/BLIP/blip_pretrain.py @@ -0,0 +1,43 @@ +''' + * Adapted from BLIP (https://github.com/salesforce/BLIP) +''' + +import transformers +transformers.logging.set_verbosity_error() + +from torch import nn +import os +from .med import BertConfig, BertModel +from .blip import create_vit, init_tokenizer + +class BLIP_Pretrain(nn.Module): + def __init__(self, + med_config = "med_config.json", + image_size = 224, + vit = 'base', + vit_grad_ckpt = False, + vit_ckpt_layer = 0, + embed_dim = 256, + queue_size = 57600, + momentum = 0.995, + ): + """ + Args: + med_config (str): path for the mixture of encoder-decoder model's configuration file + image_size (int): input image size + vit (str): model size of vision transformer + """ + super().__init__() + + self.visual_encoder, vision_width = create_vit(vit,image_size, vit_grad_ckpt, vit_ckpt_layer, 0) + + self.tokenizer = init_tokenizer() + encoder_config = BertConfig.from_json_file(med_config) + encoder_config.encoder_width = vision_width + self.text_encoder = BertModel(config=encoder_config, add_pooling_layer=False) + + text_width = self.text_encoder.config.hidden_size + + self.vision_proj = nn.Linear(vision_width, embed_dim) + self.text_proj = nn.Linear(text_width, embed_dim) + diff --git a/sd_webui_bayesian_merger/models/BLIP/med.py b/sd_webui_bayesian_merger/models/BLIP/med.py new file mode 100644 index 0000000..426f468 --- /dev/null +++ b/sd_webui_bayesian_merger/models/BLIP/med.py @@ -0,0 +1,947 @@ +''' + * Adapted from BLIP (https://github.com/salesforce/BLIP) + * Based on huggingface code base + * https://github.com/huggingface/transformers/blob/v4.15.0/src/transformers/models/bert +''' + +import math +from typing import Tuple + +import torch +from torch import Tensor, device, nn +import torch.utils.checkpoint +from torch import nn +from torch.nn import CrossEntropyLoss + +from transformers.activations import ACT2FN +from transformers.file_utils import ( + ModelOutput, +) +from transformers.modeling_outputs import ( + BaseModelOutputWithPastAndCrossAttentions, + BaseModelOutputWithPoolingAndCrossAttentions, + CausalLMOutputWithCrossAttentions, + MaskedLMOutput, + MultipleChoiceModelOutput, + NextSentencePredictorOutput, + QuestionAnsweringModelOutput, + SequenceClassifierOutput, + TokenClassifierOutput, +) +from transformers.modeling_utils import ( + PreTrainedModel, + apply_chunking_to_forward, + find_pruneable_heads_and_indices, + prune_linear_layer, +) +from transformers.utils import logging +from transformers.models.bert.configuration_bert import BertConfig + + +logger = logging.get_logger(__name__) + + +class BertEmbeddings(nn.Module): + """Construct the embeddings from word and position embeddings.""" + + def __init__(self, config): + super().__init__() + self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id) + self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size) + + # self.LayerNorm is not snake-cased to stick with TensorFlow model variable name and be able to load + # any TensorFlow checkpoint file + self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) + self.dropout = nn.Dropout(config.hidden_dropout_prob) + + # position_ids (1, len position emb) is contiguous in memory and exported when serialized + self.register_buffer("position_ids", torch.arange(config.max_position_embeddings).expand((1, -1))) + self.position_embedding_type = getattr(config, "position_embedding_type", "absolute") + + self.config = config + + def forward( + self, input_ids=None, position_ids=None, inputs_embeds=None, past_key_values_length=0 + ): + if input_ids is not None: + input_shape = input_ids.size() + else: + input_shape = inputs_embeds.size()[:-1] + + seq_length = input_shape[1] + + if position_ids is None: + position_ids = self.position_ids[:, past_key_values_length : seq_length + past_key_values_length] + + if inputs_embeds is None: + inputs_embeds = self.word_embeddings(input_ids) + + embeddings = inputs_embeds + + if self.position_embedding_type == "absolute": + position_embeddings = self.position_embeddings(position_ids) + embeddings += position_embeddings + embeddings = self.LayerNorm(embeddings) + embeddings = self.dropout(embeddings) + return embeddings + + +class BertSelfAttention(nn.Module): + def __init__(self, config, is_cross_attention): + super().__init__() + self.config = config + if config.hidden_size % config.num_attention_heads != 0 and not hasattr(config, "embedding_size"): + raise ValueError( + "The hidden size (%d) is not a multiple of the number of attention " + "heads (%d)" % (config.hidden_size, config.num_attention_heads) + ) + + self.num_attention_heads = config.num_attention_heads + self.attention_head_size = int(config.hidden_size / config.num_attention_heads) + self.all_head_size = self.num_attention_heads * self.attention_head_size + + self.query = nn.Linear(config.hidden_size, self.all_head_size) + if is_cross_attention: + self.key = nn.Linear(config.encoder_width, self.all_head_size) + self.value = nn.Linear(config.encoder_width, self.all_head_size) + else: + self.key = nn.Linear(config.hidden_size, self.all_head_size) + self.value = nn.Linear(config.hidden_size, self.all_head_size) + + self.dropout = nn.Dropout(config.attention_probs_dropout_prob) + self.position_embedding_type = getattr(config, "position_embedding_type", "absolute") + if self.position_embedding_type == "relative_key" or self.position_embedding_type == "relative_key_query": + self.max_position_embeddings = config.max_position_embeddings + self.distance_embedding = nn.Embedding(2 * config.max_position_embeddings - 1, self.attention_head_size) + self.save_attention = False + + def save_attn_gradients(self, attn_gradients): + self.attn_gradients = attn_gradients + + def get_attn_gradients(self): + return self.attn_gradients + + def save_attention_map(self, attention_map): + self.attention_map = attention_map + + def get_attention_map(self): + return self.attention_map + + def transpose_for_scores(self, x): + new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size) + x = x.view(*new_x_shape) + return x.permute(0, 2, 1, 3) + + def forward( + self, + hidden_states, + attention_mask=None, + head_mask=None, + encoder_hidden_states=None, + encoder_attention_mask=None, + past_key_value=None, + output_attentions=False, + ): + mixed_query_layer = self.query(hidden_states) + + # If this is instantiated as a cross-attention module, the keys + # and values come from an encoder; the attention mask needs to be + # such that the encoder's padding tokens are not attended to. + is_cross_attention = encoder_hidden_states is not None + + if is_cross_attention: + key_layer = self.transpose_for_scores(self.key(encoder_hidden_states)) + value_layer = self.transpose_for_scores(self.value(encoder_hidden_states)) + attention_mask = encoder_attention_mask + elif past_key_value is not None: + key_layer = self.transpose_for_scores(self.key(hidden_states)) + value_layer = self.transpose_for_scores(self.value(hidden_states)) + key_layer = torch.cat([past_key_value[0], key_layer], dim=2) + value_layer = torch.cat([past_key_value[1], value_layer], dim=2) + else: + key_layer = self.transpose_for_scores(self.key(hidden_states)) + value_layer = self.transpose_for_scores(self.value(hidden_states)) + + query_layer = self.transpose_for_scores(mixed_query_layer) + + past_key_value = (key_layer, value_layer) + + # Take the dot product between "query" and "key" to get the raw attention scores. + attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) + + if self.position_embedding_type == "relative_key" or self.position_embedding_type == "relative_key_query": + seq_length = hidden_states.size()[1] + position_ids_l = torch.arange(seq_length, dtype=torch.long, device=hidden_states.device).view(-1, 1) + position_ids_r = torch.arange(seq_length, dtype=torch.long, device=hidden_states.device).view(1, -1) + distance = position_ids_l - position_ids_r + positional_embedding = self.distance_embedding(distance + self.max_position_embeddings - 1) + positional_embedding = positional_embedding.to(dtype=query_layer.dtype) # fp16 compatibility + + if self.position_embedding_type == "relative_key": + relative_position_scores = torch.einsum("bhld,lrd->bhlr", query_layer, positional_embedding) + attention_scores = attention_scores + relative_position_scores + elif self.position_embedding_type == "relative_key_query": + relative_position_scores_query = torch.einsum("bhld,lrd->bhlr", query_layer, positional_embedding) + relative_position_scores_key = torch.einsum("bhrd,lrd->bhlr", key_layer, positional_embedding) + attention_scores = attention_scores + relative_position_scores_query + relative_position_scores_key + + attention_scores = attention_scores / math.sqrt(self.attention_head_size) + if attention_mask is not None: + # Apply the attention mask is (precomputed for all layers in BertModel forward() function) + attention_scores = attention_scores + attention_mask + + # Normalize the attention scores to probabilities. + attention_probs = nn.Softmax(dim=-1)(attention_scores) + + if is_cross_attention and self.save_attention: + self.save_attention_map(attention_probs) + attention_probs.register_hook(self.save_attn_gradients) + + # This is actually dropping out entire tokens to attend to, which might + # seem a bit unusual, but is taken from the original Transformer paper. + attention_probs_dropped = self.dropout(attention_probs) + + # Mask heads if we want to + if head_mask is not None: + attention_probs_dropped = attention_probs_dropped * head_mask + + context_layer = torch.matmul(attention_probs_dropped, value_layer) + + context_layer = context_layer.permute(0, 2, 1, 3).contiguous() + new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,) + context_layer = context_layer.view(*new_context_layer_shape) + + outputs = (context_layer, attention_probs) if output_attentions else (context_layer,) + + outputs = outputs + (past_key_value,) + return outputs + + +class BertSelfOutput(nn.Module): + def __init__(self, config): + super().__init__() + self.dense = nn.Linear(config.hidden_size, config.hidden_size) + self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) + self.dropout = nn.Dropout(config.hidden_dropout_prob) + + def forward(self, hidden_states, input_tensor): + hidden_states = self.dense(hidden_states) + hidden_states = self.dropout(hidden_states) + hidden_states = self.LayerNorm(hidden_states + input_tensor) + return hidden_states + + +class BertAttention(nn.Module): + def __init__(self, config, is_cross_attention=False): + super().__init__() + self.self = BertSelfAttention(config, is_cross_attention) + self.output = BertSelfOutput(config) + self.pruned_heads = set() + + def prune_heads(self, heads): + if len(heads) == 0: + return + heads, index = find_pruneable_heads_and_indices( + heads, self.self.num_attention_heads, self.self.attention_head_size, self.pruned_heads + ) + + # Prune linear layers + self.self.query = prune_linear_layer(self.self.query, index) + self.self.key = prune_linear_layer(self.self.key, index) + self.self.value = prune_linear_layer(self.self.value, index) + self.output.dense = prune_linear_layer(self.output.dense, index, dim=1) + + # Update hyper params and store pruned heads + self.self.num_attention_heads = self.self.num_attention_heads - len(heads) + self.self.all_head_size = self.self.attention_head_size * self.self.num_attention_heads + self.pruned_heads = self.pruned_heads.union(heads) + + def forward( + self, + hidden_states, + attention_mask=None, + head_mask=None, + encoder_hidden_states=None, + encoder_attention_mask=None, + past_key_value=None, + output_attentions=False, + ): + self_outputs = self.self( + hidden_states, + attention_mask, + head_mask, + encoder_hidden_states, + encoder_attention_mask, + past_key_value, + output_attentions, + ) + attention_output = self.output(self_outputs[0], hidden_states) + outputs = (attention_output,) + self_outputs[1:] # add attentions if we output them + return outputs + + +class BertIntermediate(nn.Module): + def __init__(self, config): + super().__init__() + self.dense = nn.Linear(config.hidden_size, config.intermediate_size) + if isinstance(config.hidden_act, str): + self.intermediate_act_fn = ACT2FN[config.hidden_act] + else: + self.intermediate_act_fn = config.hidden_act + + def forward(self, hidden_states): + hidden_states = self.dense(hidden_states) + hidden_states = self.intermediate_act_fn(hidden_states) + return hidden_states + + +class BertOutput(nn.Module): + def __init__(self, config): + super().__init__() + self.dense = nn.Linear(config.intermediate_size, config.hidden_size) + self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) + self.dropout = nn.Dropout(config.hidden_dropout_prob) + + def forward(self, hidden_states, input_tensor): + hidden_states = self.dense(hidden_states) + hidden_states = self.dropout(hidden_states) + hidden_states = self.LayerNorm(hidden_states + input_tensor) + return hidden_states + + +class BertLayer(nn.Module): + def __init__(self, config, layer_num): + super().__init__() + self.config = config + self.chunk_size_feed_forward = config.chunk_size_feed_forward + self.seq_len_dim = 1 + self.attention = BertAttention(config) + self.layer_num = layer_num + if self.config.add_cross_attention: + self.crossattention = BertAttention(config, is_cross_attention=self.config.add_cross_attention) + self.intermediate = BertIntermediate(config) + self.output = BertOutput(config) + + def forward( + self, + hidden_states, + attention_mask=None, + head_mask=None, + encoder_hidden_states=None, + encoder_attention_mask=None, + past_key_value=None, + output_attentions=False, + mode=None, + ): + # decoder uni-directional self-attention cached key/values tuple is at positions 1,2 + self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None + self_attention_outputs = self.attention( + hidden_states, + attention_mask, + head_mask, + output_attentions=output_attentions, + past_key_value=self_attn_past_key_value, + ) + attention_output = self_attention_outputs[0] + + outputs = self_attention_outputs[1:-1] + present_key_value = self_attention_outputs[-1] + + if mode=='multimodal': + assert encoder_hidden_states is not None, "encoder_hidden_states must be given for cross-attention layers" + + cross_attention_outputs = self.crossattention( + attention_output, + attention_mask, + head_mask, + encoder_hidden_states, + encoder_attention_mask, + output_attentions=output_attentions, + ) + attention_output = cross_attention_outputs[0] + outputs = outputs + cross_attention_outputs[1:-1] # add cross attentions if we output attention weights + layer_output = apply_chunking_to_forward( + self.feed_forward_chunk, self.chunk_size_feed_forward, self.seq_len_dim, attention_output + ) + outputs = (layer_output,) + outputs + + outputs = outputs + (present_key_value,) + + return outputs + + def feed_forward_chunk(self, attention_output): + intermediate_output = self.intermediate(attention_output) + layer_output = self.output(intermediate_output, attention_output) + return layer_output + + +class BertEncoder(nn.Module): + def __init__(self, config): + super().__init__() + self.config = config + self.layer = nn.ModuleList([BertLayer(config,i) for i in range(config.num_hidden_layers)]) + self.gradient_checkpointing = False + + def forward( + self, + hidden_states, + attention_mask=None, + head_mask=None, + encoder_hidden_states=None, + encoder_attention_mask=None, + past_key_values=None, + use_cache=None, + output_attentions=False, + output_hidden_states=False, + return_dict=True, + mode='multimodal', + ): + all_hidden_states = () if output_hidden_states else None + all_self_attentions = () if output_attentions else None + all_cross_attentions = () if output_attentions and self.config.add_cross_attention else None + + next_decoder_cache = () if use_cache else None + + for i in range(self.config.num_hidden_layers): + layer_module = self.layer[i] + if output_hidden_states: + all_hidden_states = all_hidden_states + (hidden_states,) + + layer_head_mask = head_mask[i] if head_mask is not None else None + past_key_value = past_key_values[i] if past_key_values is not None else None + + if self.gradient_checkpointing and self.training: + + if use_cache: + logger.warn( + "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." + ) + use_cache = False + + def create_custom_forward(module): + def custom_forward(*inputs): + return module(*inputs, past_key_value, output_attentions) + + return custom_forward + + layer_outputs = torch.utils.checkpoint.checkpoint( + create_custom_forward(layer_module), + hidden_states, + attention_mask, + layer_head_mask, + encoder_hidden_states, + encoder_attention_mask, + mode=mode, + ) + else: + layer_outputs = layer_module( + hidden_states, + attention_mask, + layer_head_mask, + encoder_hidden_states, + encoder_attention_mask, + past_key_value, + output_attentions, + mode=mode, + ) + + hidden_states = layer_outputs[0] + if use_cache: + next_decoder_cache += (layer_outputs[-1],) + if output_attentions: + all_self_attentions = all_self_attentions + (layer_outputs[1],) + + if output_hidden_states: + all_hidden_states = all_hidden_states + (hidden_states,) + + if not return_dict: + return tuple( + v + for v in [ + hidden_states, + next_decoder_cache, + all_hidden_states, + all_self_attentions, + all_cross_attentions, + ] + if v is not None + ) + return BaseModelOutputWithPastAndCrossAttentions( + last_hidden_state=hidden_states, + past_key_values=next_decoder_cache, + hidden_states=all_hidden_states, + attentions=all_self_attentions, + cross_attentions=all_cross_attentions, + ) + + +class BertPooler(nn.Module): + def __init__(self, config): + super().__init__() + self.dense = nn.Linear(config.hidden_size, config.hidden_size) + self.activation = nn.Tanh() + + def forward(self, hidden_states): + # We "pool" the model by simply taking the hidden state corresponding + # to the first token. + first_token_tensor = hidden_states[:, 0] + pooled_output = self.dense(first_token_tensor) + pooled_output = self.activation(pooled_output) + return pooled_output + + +class BertPredictionHeadTransform(nn.Module): + def __init__(self, config): + super().__init__() + self.dense = nn.Linear(config.hidden_size, config.hidden_size) + if isinstance(config.hidden_act, str): + self.transform_act_fn = ACT2FN[config.hidden_act] + else: + self.transform_act_fn = config.hidden_act + self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) + + def forward(self, hidden_states): + hidden_states = self.dense(hidden_states) + hidden_states = self.transform_act_fn(hidden_states) + hidden_states = self.LayerNorm(hidden_states) + return hidden_states + + +class BertLMPredictionHead(nn.Module): + def __init__(self, config): + super().__init__() + self.transform = BertPredictionHeadTransform(config) + + # The output weights are the same as the input embeddings, but there is + # an output-only bias for each token. + self.decoder = nn.Linear(config.hidden_size, config.vocab_size, bias=False) + + self.bias = nn.Parameter(torch.zeros(config.vocab_size)) + + # Need a link between the two variables so that the bias is correctly resized with `resize_token_embeddings` + self.decoder.bias = self.bias + + def forward(self, hidden_states): + hidden_states = self.transform(hidden_states) + hidden_states = self.decoder(hidden_states) + return hidden_states + + +class BertOnlyMLMHead(nn.Module): + def __init__(self, config): + super().__init__() + self.predictions = BertLMPredictionHead(config) + + def forward(self, sequence_output): + prediction_scores = self.predictions(sequence_output) + return prediction_scores + + +class BertPreTrainedModel(PreTrainedModel): + """ + An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained + models. + """ + + config_class = BertConfig + base_model_prefix = "bert" + _keys_to_ignore_on_load_missing = [r"position_ids"] + + def _init_weights(self, module): + """ Initialize the weights """ + if isinstance(module, (nn.Linear, nn.Embedding)): + # Slightly different from the TF version which uses truncated_normal for initialization + # cf https://github.com/pytorch/pytorch/pull/5617 + module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) + elif isinstance(module, nn.LayerNorm): + module.bias.data.zero_() + module.weight.data.fill_(1.0) + if isinstance(module, nn.Linear) and module.bias is not None: + module.bias.data.zero_() + + +class BertModel(BertPreTrainedModel): + """ + The model can behave as an encoder (with only self-attention) as well as a decoder, in which case a layer of + cross-attention is added between the self-attention layers, following the architecture described in `Attention is + all you need `__ by Ashish Vaswani, Noam Shazeer, Niki Parmar, Jakob Uszkoreit, + Llion Jones, Aidan N. Gomez, Lukasz Kaiser and Illia Polosukhin. + argument and :obj:`add_cross_attention` set to :obj:`True`; an :obj:`encoder_hidden_states` is then expected as an + input to the forward pass. + """ + + def __init__(self, config, add_pooling_layer=True): + super().__init__(config) + self.config = config + + self.embeddings = BertEmbeddings(config) + + self.encoder = BertEncoder(config) + + self.pooler = BertPooler(config) if add_pooling_layer else None + + self.init_weights() + + + def get_input_embeddings(self): + return self.embeddings.word_embeddings + + def set_input_embeddings(self, value): + self.embeddings.word_embeddings = value + + def _prune_heads(self, heads_to_prune): + """ + Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} See base + class PreTrainedModel + """ + for layer, heads in heads_to_prune.items(): + self.encoder.layer[layer].attention.prune_heads(heads) + + + def get_extended_attention_mask(self, attention_mask: Tensor, input_shape: Tuple[int], device: device, is_decoder: bool) -> Tensor: + """ + Makes broadcastable attention and causal masks so that future and masked tokens are ignored. + + Arguments: + attention_mask (:obj:`torch.Tensor`): + Mask with ones indicating tokens to attend to, zeros for tokens to ignore. + input_shape (:obj:`Tuple[int]`): + The shape of the input to the model. + device: (:obj:`torch.device`): + The device of the input to the model. + + Returns: + :obj:`torch.Tensor` The extended attention mask, with a the same dtype as :obj:`attention_mask.dtype`. + """ + # We can provide a self-attention mask of dimensions [batch_size, from_seq_length, to_seq_length] + # ourselves in which case we just need to make it broadcastable to all heads. + if attention_mask.dim() == 3: + extended_attention_mask = attention_mask[:, None, :, :] + elif attention_mask.dim() == 2: + # Provided a padding mask of dimensions [batch_size, seq_length] + # - if the model is a decoder, apply a causal mask in addition to the padding mask + # - if the model is an encoder, make the mask broadcastable to [batch_size, num_heads, seq_length, seq_length] + if is_decoder: + batch_size, seq_length = input_shape + + seq_ids = torch.arange(seq_length, device=device) + causal_mask = seq_ids[None, None, :].repeat(batch_size, seq_length, 1) <= seq_ids[None, :, None] + # in case past_key_values are used we need to add a prefix ones mask to the causal mask + # causal and attention masks must have same type with pytorch version < 1.3 + causal_mask = causal_mask.to(attention_mask.dtype) + + if causal_mask.shape[1] < attention_mask.shape[1]: + prefix_seq_len = attention_mask.shape[1] - causal_mask.shape[1] + causal_mask = torch.cat( + [ + torch.ones((batch_size, seq_length, prefix_seq_len), device=device, dtype=causal_mask.dtype), + causal_mask, + ], + axis=-1, + ) + + extended_attention_mask = causal_mask[:, None, :, :] * attention_mask[:, None, None, :] + else: + extended_attention_mask = attention_mask[:, None, None, :] + else: + raise ValueError( + "Wrong shape for input_ids (shape {}) or attention_mask (shape {})".format( + input_shape, attention_mask.shape + ) + ) + + # Since attention_mask is 1.0 for positions we want to attend and 0.0 for + # masked positions, this operation will create a tensor which is 0.0 for + # positions we want to attend and -10000.0 for masked positions. + # Since we are adding it to the raw scores before the softmax, this is + # effectively the same as removing these entirely. + extended_attention_mask = extended_attention_mask.to(dtype=self.dtype) # fp16 compatibility + extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0 + return extended_attention_mask + + def forward( + self, + input_ids=None, + attention_mask=None, + position_ids=None, + head_mask=None, + inputs_embeds=None, + encoder_embeds=None, + encoder_hidden_states=None, + encoder_attention_mask=None, + past_key_values=None, + use_cache=None, + output_attentions=None, + output_hidden_states=None, + return_dict=None, + is_decoder=False, + mode='multimodal', + ): + r""" + encoder_hidden_states (:obj:`torch.FloatTensor` of shape :obj:`(batch_size, sequence_length, hidden_size)`, `optional`): + Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention if + the model is configured as a decoder. + encoder_attention_mask (:obj:`torch.FloatTensor` of shape :obj:`(batch_size, sequence_length)`, `optional`): + Mask to avoid performing attention on the padding token indices of the encoder input. This mask is used in + the cross-attention if the model is configured as a decoder. Mask values selected in ``[0, 1]``: + - 1 for tokens that are **not masked**, + - 0 for tokens that are **masked**. + past_key_values (:obj:`tuple(tuple(torch.FloatTensor))` of length :obj:`config.n_layers` with each tuple having 4 tensors of shape :obj:`(batch_size, num_heads, sequence_length - 1, embed_size_per_head)`): + Contains precomputed key and value hidden states of the attention blocks. Can be used to speed up decoding. + If :obj:`past_key_values` are used, the user can optionally input only the last :obj:`decoder_input_ids` + (those that don't have their past key value states given to this model) of shape :obj:`(batch_size, 1)` + instead of all :obj:`decoder_input_ids` of shape :obj:`(batch_size, sequence_length)`. + use_cache (:obj:`bool`, `optional`): + If set to :obj:`True`, :obj:`past_key_values` key value states are returned and can be used to speed up + decoding (see :obj:`past_key_values`). + """ + output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions + output_hidden_states = ( + output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states + ) + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + + if is_decoder: + use_cache = use_cache if use_cache is not None else self.config.use_cache + else: + use_cache = False + + if input_ids is not None and inputs_embeds is not None: + raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time") + elif input_ids is not None: + input_shape = input_ids.size() + batch_size, seq_length = input_shape + device = input_ids.device + elif inputs_embeds is not None: + input_shape = inputs_embeds.size()[:-1] + batch_size, seq_length = input_shape + device = inputs_embeds.device + elif encoder_embeds is not None: + input_shape = encoder_embeds.size()[:-1] + batch_size, seq_length = input_shape + device = encoder_embeds.device + else: + raise ValueError("You have to specify either input_ids or inputs_embeds or encoder_embeds") + + # past_key_values_length + past_key_values_length = past_key_values[0][0].shape[2] if past_key_values is not None else 0 + + if attention_mask is None: + attention_mask = torch.ones(((batch_size, seq_length + past_key_values_length)), device=device) + + # We can provide a self-attention mask of dimensions [batch_size, from_seq_length, to_seq_length] + # ourselves in which case we just need to make it broadcastable to all heads. + extended_attention_mask: torch.Tensor = self.get_extended_attention_mask(attention_mask, input_shape, + device, is_decoder) + + # If a 2D or 3D attention mask is provided for the cross-attention + # we need to make broadcastable to [batch_size, num_heads, seq_length, seq_length] + if encoder_hidden_states is not None: + if type(encoder_hidden_states) == list: + encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states[0].size() + else: + encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size() + encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length) + + if type(encoder_attention_mask) == list: + encoder_extended_attention_mask = [self.invert_attention_mask(mask) for mask in encoder_attention_mask] + elif encoder_attention_mask is None: + encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device) + encoder_extended_attention_mask = self.invert_attention_mask(encoder_attention_mask) + else: + encoder_extended_attention_mask = self.invert_attention_mask(encoder_attention_mask) + else: + encoder_extended_attention_mask = None + + # Prepare head mask if needed + # 1.0 in head_mask indicate we keep the head + # attention_probs has shape bsz x n_heads x N x N + # input head_mask has shape [num_heads] or [num_hidden_layers x num_heads] + # and head_mask is converted to shape [num_hidden_layers x batch x num_heads x seq_length x seq_length] + head_mask = self.get_head_mask(head_mask, self.config.num_hidden_layers) + + if encoder_embeds is None: + embedding_output = self.embeddings( + input_ids=input_ids, + position_ids=position_ids, + inputs_embeds=inputs_embeds, + past_key_values_length=past_key_values_length, + ) + else: + embedding_output = encoder_embeds + + encoder_outputs = self.encoder( + embedding_output, + attention_mask=extended_attention_mask, + head_mask=head_mask, + encoder_hidden_states=encoder_hidden_states, + encoder_attention_mask=encoder_extended_attention_mask, + past_key_values=past_key_values, + use_cache=use_cache, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + mode=mode, + ) + sequence_output = encoder_outputs[0] + pooled_output = self.pooler(sequence_output) if self.pooler is not None else None + + if not return_dict: + return (sequence_output, pooled_output) + encoder_outputs[1:] + + return BaseModelOutputWithPoolingAndCrossAttentions( + last_hidden_state=sequence_output, + pooler_output=pooled_output, + past_key_values=encoder_outputs.past_key_values, + hidden_states=encoder_outputs.hidden_states, + attentions=encoder_outputs.attentions, + cross_attentions=encoder_outputs.cross_attentions, + ) + + + +class BertLMHeadModel(BertPreTrainedModel): + + _keys_to_ignore_on_load_unexpected = [r"pooler"] + _keys_to_ignore_on_load_missing = [r"position_ids", r"predictions.decoder.bias"] + + def __init__(self, config): + super().__init__(config) + + self.bert = BertModel(config, add_pooling_layer=False) + self.cls = BertOnlyMLMHead(config) + + self.init_weights() + + def get_output_embeddings(self): + return self.cls.predictions.decoder + + def set_output_embeddings(self, new_embeddings): + self.cls.predictions.decoder = new_embeddings + + def forward( + self, + input_ids=None, + attention_mask=None, + position_ids=None, + head_mask=None, + inputs_embeds=None, + encoder_hidden_states=None, + encoder_attention_mask=None, + labels=None, + past_key_values=None, + use_cache=None, + output_attentions=None, + output_hidden_states=None, + return_dict=None, + return_logits=False, + is_decoder=True, + reduction='mean', + mode='multimodal', + ): + r""" + encoder_hidden_states (:obj:`torch.FloatTensor` of shape :obj:`(batch_size, sequence_length, hidden_size)`, `optional`): + Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention if + the model is configured as a decoder. + encoder_attention_mask (:obj:`torch.FloatTensor` of shape :obj:`(batch_size, sequence_length)`, `optional`): + Mask to avoid performing attention on the padding token indices of the encoder input. This mask is used in + the cross-attention if the model is configured as a decoder. Mask values selected in ``[0, 1]``: + - 1 for tokens that are **not masked**, + - 0 for tokens that are **masked**. + labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size, sequence_length)`, `optional`): + Labels for computing the left-to-right language modeling loss (next word prediction). Indices should be in + ``[-100, 0, ..., config.vocab_size]`` (see ``input_ids`` docstring) Tokens with indices set to ``-100`` are + ignored (masked), the loss is only computed for the tokens with labels n ``[0, ..., config.vocab_size]`` + past_key_values (:obj:`tuple(tuple(torch.FloatTensor))` of length :obj:`config.n_layers` with each tuple having 4 tensors of shape :obj:`(batch_size, num_heads, sequence_length - 1, embed_size_per_head)`): + Contains precomputed key and value hidden states of the attention blocks. Can be used to speed up decoding. + If :obj:`past_key_values` are used, the user can optionally input only the last :obj:`decoder_input_ids` + (those that don't have their past key value states given to this model) of shape :obj:`(batch_size, 1)` + instead of all :obj:`decoder_input_ids` of shape :obj:`(batch_size, sequence_length)`. + use_cache (:obj:`bool`, `optional`): + If set to :obj:`True`, :obj:`past_key_values` key value states are returned and can be used to speed up + decoding (see :obj:`past_key_values`). + Returns: + Example:: + >>> from transformers import BertTokenizer, BertLMHeadModel, BertConfig + >>> import torch + >>> tokenizer = BertTokenizer.from_pretrained('bert-base-cased') + >>> config = BertConfig.from_pretrained("bert-base-cased") + >>> model = BertLMHeadModel.from_pretrained('bert-base-cased', config=config) + >>> inputs = tokenizer("Hello, my dog is cute", return_tensors="pt") + >>> outputs = model(**inputs) + >>> prediction_logits = outputs.logits + """ + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + if labels is not None: + use_cache = False + + outputs = self.bert( + input_ids, + attention_mask=attention_mask, + position_ids=position_ids, + head_mask=head_mask, + inputs_embeds=inputs_embeds, + encoder_hidden_states=encoder_hidden_states, + encoder_attention_mask=encoder_attention_mask, + past_key_values=past_key_values, + use_cache=use_cache, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + is_decoder=is_decoder, + mode=mode, + ) + + sequence_output = outputs[0] + prediction_scores = self.cls(sequence_output) + + if return_logits: + return prediction_scores[:, :-1, :].contiguous() + + lm_loss = None + if labels is not None: + # we are doing next-token prediction; shift prediction scores and input ids by one + shifted_prediction_scores = prediction_scores[:, :-1, :].contiguous() + labels = labels[:, 1:].contiguous() + loss_fct = CrossEntropyLoss(reduction=reduction, label_smoothing=0.1) + lm_loss = loss_fct(shifted_prediction_scores.view(-1, self.config.vocab_size), labels.view(-1)) + if reduction=='none': + lm_loss = lm_loss.view(prediction_scores.size(0),-1).sum(1) + + if not return_dict: + output = (prediction_scores,) + outputs[2:] + return ((lm_loss,) + output) if lm_loss is not None else output + + return CausalLMOutputWithCrossAttentions( + loss=lm_loss, + logits=prediction_scores, + past_key_values=outputs.past_key_values, + hidden_states=outputs.hidden_states, + attentions=outputs.attentions, + cross_attentions=outputs.cross_attentions, + ) + + def prepare_inputs_for_generation(self, input_ids, past=None, attention_mask=None, **model_kwargs): + input_shape = input_ids.shape + # if model is used as a decoder in encoder-decoder model, the decoder attention mask is created on the fly + if attention_mask is None: + attention_mask = input_ids.new_ones(input_shape) + + # cut decoder_input_ids if past is used + if past is not None: + input_ids = input_ids[:, -1:] + + return { + "input_ids": input_ids, + "attention_mask": attention_mask, + "past_key_values": past, + "encoder_hidden_states": model_kwargs.get("encoder_hidden_states", None), + "encoder_attention_mask": model_kwargs.get("encoder_attention_mask", None), + "is_decoder": True, + } + + def _reorder_cache(self, past, beam_idx): + reordered_past = () + for layer_past in past: + reordered_past += (tuple(past_state.index_select(0, beam_idx) for past_state in layer_past),) + return reordered_past diff --git a/sd_webui_bayesian_merger/models/BLIP/vit.py b/sd_webui_bayesian_merger/models/BLIP/vit.py new file mode 100644 index 0000000..7e5cf43 --- /dev/null +++ b/sd_webui_bayesian_merger/models/BLIP/vit.py @@ -0,0 +1,301 @@ +''' + * Adapted from BLIP (https://github.com/salesforce/BLIP) + * Based on timm code base + * https://github.com/rwightman/pytorch-image-models/tree/master/timm +''' + +import torch +import torch.nn as nn +import torch.nn.functional as F +from functools import partial + +from timm.models.vision_transformer import _cfg, PatchEmbed +from timm.models.registry import register_model +from timm.models.layers import trunc_normal_, DropPath +from timm.models.helpers import named_apply, adapt_input_conv + +from fairscale.nn.checkpoint.checkpoint_activations import checkpoint_wrapper + +class Mlp(nn.Module): + """ MLP as used in Vision Transformer, MLP-Mixer and related networks + """ + def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.): + super().__init__() + out_features = out_features or in_features + hidden_features = hidden_features or in_features + self.fc1 = nn.Linear(in_features, hidden_features) + self.act = act_layer() + self.fc2 = nn.Linear(hidden_features, out_features) + self.drop = nn.Dropout(drop) + + def forward(self, x): + x = self.fc1(x) + x = self.act(x) + x = self.drop(x) + x = self.fc2(x) + x = self.drop(x) + return x + + +class Attention(nn.Module): + def __init__(self, dim, num_heads=8, qkv_bias=False, qk_scale=None, attn_drop=0., proj_drop=0.): + super().__init__() + self.num_heads = num_heads + head_dim = dim // num_heads + # NOTE scale factor was wrong in my original version, can set manually to be compat with prev weights + self.scale = qk_scale or head_dim ** -0.5 + self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias) + self.attn_drop = nn.Dropout(attn_drop) + self.proj = nn.Linear(dim, dim) + self.proj_drop = nn.Dropout(proj_drop) + self.attn_gradients = None + self.attention_map = None + + def save_attn_gradients(self, attn_gradients): + self.attn_gradients = attn_gradients + + def get_attn_gradients(self): + return self.attn_gradients + + def save_attention_map(self, attention_map): + self.attention_map = attention_map + + def get_attention_map(self): + return self.attention_map + + def forward(self, x, register_hook=False): + B, N, C = x.shape + qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4) + q, k, v = qkv[0], qkv[1], qkv[2] # make torchscript happy (cannot use tensor as tuple) + + attn = (q @ k.transpose(-2, -1)) * self.scale + attn = attn.softmax(dim=-1) + attn = self.attn_drop(attn) + + if register_hook: + self.save_attention_map(attn) + attn.register_hook(self.save_attn_gradients) + + x = (attn @ v).transpose(1, 2).reshape(B, N, C) + x = self.proj(x) + x = self.proj_drop(x) + return x + + +class Block(nn.Module): + + def __init__(self, dim, num_heads, mlp_ratio=4., qkv_bias=False, qk_scale=None, drop=0., attn_drop=0., + drop_path=0., act_layer=nn.GELU, norm_layer=nn.LayerNorm, use_grad_checkpointing=False): + super().__init__() + self.norm1 = norm_layer(dim) + self.attn = Attention( + dim, num_heads=num_heads, qkv_bias=qkv_bias, qk_scale=qk_scale, attn_drop=attn_drop, proj_drop=drop) + # NOTE: drop path for stochastic depth, we shall see if this is better than dropout here + self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity() + self.norm2 = norm_layer(dim) + mlp_hidden_dim = int(dim * mlp_ratio) + self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop) + + if use_grad_checkpointing: + self.attn = checkpoint_wrapper(self.attn) + self.mlp = checkpoint_wrapper(self.mlp) + + def forward(self, x, register_hook=False): + x = x + self.drop_path(self.attn(self.norm1(x), register_hook=register_hook)) + x = x + self.drop_path(self.mlp(self.norm2(x))) + return x + + +class VisionTransformer(nn.Module): + """ Vision Transformer + A PyTorch impl of : `An Image is Worth 16x16 Words: Transformers for Image Recognition at Scale` - + https://arxiv.org/abs/2010.11929 + """ + def __init__(self, img_size=224, patch_size=16, in_chans=3, num_classes=1000, embed_dim=768, depth=12, + num_heads=12, mlp_ratio=4., qkv_bias=True, qk_scale=None, representation_size=None, + drop_rate=0., attn_drop_rate=0., drop_path_rate=0., norm_layer=None, + use_grad_checkpointing=False, ckpt_layer=0): + """ + Args: + img_size (int, tuple): input image size + patch_size (int, tuple): patch size + in_chans (int): number of input channels + num_classes (int): number of classes for classification head + embed_dim (int): embedding dimension + depth (int): depth of transformer + num_heads (int): number of attention heads + mlp_ratio (int): ratio of mlp hidden dim to embedding dim + qkv_bias (bool): enable bias for qkv if True + qk_scale (float): override default qk scale of head_dim ** -0.5 if set + representation_size (Optional[int]): enable and set representation layer (pre-logits) to this value if set + drop_rate (float): dropout rate + attn_drop_rate (float): attention dropout rate + drop_path_rate (float): stochastic depth rate + norm_layer: (nn.Module): normalization layer + """ + super().__init__() + self.num_features = self.embed_dim = embed_dim # num_features for consistency with other models + norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6) + + self.patch_embed = PatchEmbed( + img_size=img_size, patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim) + + num_patches = self.patch_embed.num_patches + + self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) + self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + 1, embed_dim)) + self.pos_drop = nn.Dropout(p=drop_rate) + + dpr = [x.item() for x in torch.linspace(0, drop_path_rate, depth)] # stochastic depth decay rule + self.blocks = nn.ModuleList([ + Block( + dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, qk_scale=qk_scale, + drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr[i], norm_layer=norm_layer, + use_grad_checkpointing=(use_grad_checkpointing and i>=depth-ckpt_layer) + ) + for i in range(depth)]) + self.norm = norm_layer(embed_dim) + + trunc_normal_(self.pos_embed, std=.02) + trunc_normal_(self.cls_token, std=.02) + self.apply(self._init_weights) + + def _init_weights(self, m): + if isinstance(m, nn.Linear): + trunc_normal_(m.weight, std=.02) + if isinstance(m, nn.Linear) and m.bias is not None: + nn.init.constant_(m.bias, 0) + elif isinstance(m, nn.LayerNorm): + nn.init.constant_(m.bias, 0) + nn.init.constant_(m.weight, 1.0) + + @torch.jit.ignore + def no_weight_decay(self): + return {'pos_embed', 'cls_token'} + + def forward(self, x, register_blk=-1): + B = x.shape[0] + x = self.patch_embed(x) + + cls_tokens = self.cls_token.expand(B, -1, -1) # stole cls_tokens impl from Phil Wang, thanks + x = torch.cat((cls_tokens, x), dim=1) + + x = x + self.pos_embed[:,:x.size(1),:] + x = self.pos_drop(x) + + for i,blk in enumerate(self.blocks): + x = blk(x, register_blk==i) + x = self.norm(x) + + return x + + @torch.jit.ignore() + def load_pretrained(self, checkpoint_path, prefix=''): + _load_weights(self, checkpoint_path, prefix) + + +@torch.no_grad() +def _load_weights(model: VisionTransformer, checkpoint_path: str, prefix: str = ''): + """ Load weights from .npz checkpoints for official Google Brain Flax implementation + """ + import numpy as np + + def _n2p(w, t=True): + if w.ndim == 4 and w.shape[0] == w.shape[1] == w.shape[2] == 1: + w = w.flatten() + if t: + if w.ndim == 4: + w = w.transpose([3, 2, 0, 1]) + elif w.ndim == 3: + w = w.transpose([2, 0, 1]) + elif w.ndim == 2: + w = w.transpose([1, 0]) + return torch.from_numpy(w) + + w = np.load(checkpoint_path) + if not prefix and 'opt/target/embedding/kernel' in w: + prefix = 'opt/target/' + + if hasattr(model.patch_embed, 'backbone'): + # hybrid + backbone = model.patch_embed.backbone + stem_only = not hasattr(backbone, 'stem') + stem = backbone if stem_only else backbone.stem + stem.conv.weight.copy_(adapt_input_conv(stem.conv.weight.shape[1], _n2p(w[f'{prefix}conv_root/kernel']))) + stem.norm.weight.copy_(_n2p(w[f'{prefix}gn_root/scale'])) + stem.norm.bias.copy_(_n2p(w[f'{prefix}gn_root/bias'])) + if not stem_only: + for i, stage in enumerate(backbone.stages): + for j, block in enumerate(stage.blocks): + bp = f'{prefix}block{i + 1}/unit{j + 1}/' + for r in range(3): + getattr(block, f'conv{r + 1}').weight.copy_(_n2p(w[f'{bp}conv{r + 1}/kernel'])) + getattr(block, f'norm{r + 1}').weight.copy_(_n2p(w[f'{bp}gn{r + 1}/scale'])) + getattr(block, f'norm{r + 1}').bias.copy_(_n2p(w[f'{bp}gn{r + 1}/bias'])) + if block.downsample is not None: + block.downsample.conv.weight.copy_(_n2p(w[f'{bp}conv_proj/kernel'])) + block.downsample.norm.weight.copy_(_n2p(w[f'{bp}gn_proj/scale'])) + block.downsample.norm.bias.copy_(_n2p(w[f'{bp}gn_proj/bias'])) + embed_conv_w = _n2p(w[f'{prefix}embedding/kernel']) + else: + embed_conv_w = adapt_input_conv( + model.patch_embed.proj.weight.shape[1], _n2p(w[f'{prefix}embedding/kernel'])) + model.patch_embed.proj.weight.copy_(embed_conv_w) + model.patch_embed.proj.bias.copy_(_n2p(w[f'{prefix}embedding/bias'])) + model.cls_token.copy_(_n2p(w[f'{prefix}cls'], t=False)) + pos_embed_w = _n2p(w[f'{prefix}Transformer/posembed_input/pos_embedding'], t=False) + if pos_embed_w.shape != model.pos_embed.shape: + pos_embed_w = resize_pos_embed( # resize pos embedding when different size from pretrained weights + pos_embed_w, model.pos_embed, getattr(model, 'num_tokens', 1), model.patch_embed.grid_size) + model.pos_embed.copy_(pos_embed_w) + model.norm.weight.copy_(_n2p(w[f'{prefix}Transformer/encoder_norm/scale'])) + model.norm.bias.copy_(_n2p(w[f'{prefix}Transformer/encoder_norm/bias'])) +# if isinstance(model.head, nn.Linear) and model.head.bias.shape[0] == w[f'{prefix}head/bias'].shape[-1]: +# model.head.weight.copy_(_n2p(w[f'{prefix}head/kernel'])) +# model.head.bias.copy_(_n2p(w[f'{prefix}head/bias'])) +# if isinstance(getattr(model.pre_logits, 'fc', None), nn.Linear) and f'{prefix}pre_logits/bias' in w: +# model.pre_logits.fc.weight.copy_(_n2p(w[f'{prefix}pre_logits/kernel'])) +# model.pre_logits.fc.bias.copy_(_n2p(w[f'{prefix}pre_logits/bias'])) + for i, block in enumerate(model.blocks.children()): + block_prefix = f'{prefix}Transformer/encoderblock_{i}/' + mha_prefix = block_prefix + 'MultiHeadDotProductAttention_1/' + block.norm1.weight.copy_(_n2p(w[f'{block_prefix}LayerNorm_0/scale'])) + block.norm1.bias.copy_(_n2p(w[f'{block_prefix}LayerNorm_0/bias'])) + block.attn.qkv.weight.copy_(torch.cat([ + _n2p(w[f'{mha_prefix}{n}/kernel'], t=False).flatten(1).T for n in ('query', 'key', 'value')])) + block.attn.qkv.bias.copy_(torch.cat([ + _n2p(w[f'{mha_prefix}{n}/bias'], t=False).reshape(-1) for n in ('query', 'key', 'value')])) + block.attn.proj.weight.copy_(_n2p(w[f'{mha_prefix}out/kernel']).flatten(1)) + block.attn.proj.bias.copy_(_n2p(w[f'{mha_prefix}out/bias'])) + for r in range(2): + getattr(block.mlp, f'fc{r + 1}').weight.copy_(_n2p(w[f'{block_prefix}MlpBlock_3/Dense_{r}/kernel'])) + getattr(block.mlp, f'fc{r + 1}').bias.copy_(_n2p(w[f'{block_prefix}MlpBlock_3/Dense_{r}/bias'])) + block.norm2.weight.copy_(_n2p(w[f'{block_prefix}LayerNorm_2/scale'])) + block.norm2.bias.copy_(_n2p(w[f'{block_prefix}LayerNorm_2/bias'])) + + +def interpolate_pos_embed(pos_embed_checkpoint, visual_encoder): + # interpolate position embedding + embedding_size = pos_embed_checkpoint.shape[-1] + num_patches = visual_encoder.patch_embed.num_patches + num_extra_tokens = visual_encoder.pos_embed.shape[-2] - num_patches + # height (== width) for the checkpoint position embedding + orig_size = int((pos_embed_checkpoint.shape[-2] - num_extra_tokens) ** 0.5) + # height (== width) for the new position embedding + new_size = int(num_patches ** 0.5) + + if orig_size!=new_size: + # class_token and dist_token are kept unchanged + extra_tokens = pos_embed_checkpoint[:, :num_extra_tokens] + # only the position tokens are interpolated + pos_tokens = pos_embed_checkpoint[:, num_extra_tokens:] + pos_tokens = pos_tokens.reshape(-1, orig_size, orig_size, embedding_size).permute(0, 3, 1, 2) + pos_tokens = torch.nn.functional.interpolate( + pos_tokens, size=(new_size, new_size), mode='bicubic', align_corners=False) + pos_tokens = pos_tokens.permute(0, 2, 3, 1).flatten(1, 2) + new_pos_embed = torch.cat((extra_tokens, pos_tokens), dim=1) + print('reshape position embedding from %d to %d'%(orig_size ** 2,new_size ** 2)) + + return new_pos_embed + else: + return pos_embed_checkpoint \ No newline at end of file diff --git a/sd_webui_bayesian_merger/models/BLIPScore.py b/sd_webui_bayesian_merger/models/BLIPScore.py new file mode 100644 index 0000000..185a7f6 --- /dev/null +++ b/sd_webui_bayesian_merger/models/BLIPScore.py @@ -0,0 +1,113 @@ +''' +@File : BLIPScore.py +@Time : 2023/02/19 20:48:00 +@Auther : Jiazheng Xu +@Contact : xjz22@mails.tsinghua.edu.cn +@Description: BLIPScore. +* Based on BLIP code base +* https://github.com/salesforce/BLIP +''' + +import os +import torch +import torch.nn as nn +import torch.nn.functional as F +from PIL import Image +from sd_webui_bayesian_merger.models.BLIP.blip import load_checkpoint +from sd_webui_bayesian_merger.models.BLIP.blip_pretrain import BLIP_Pretrain +from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize + +try: + from torchvision.transforms import InterpolationMode + BICUBIC = InterpolationMode.BICUBIC +except ImportError: + BICUBIC = Image.BICUBIC + + +def _convert_image_to_rgb(image): + return image.convert("RGB") + + +def _transform(n_px): + return Compose([ + Resize(n_px, interpolation=BICUBIC), + CenterCrop(n_px), + _convert_image_to_rgb, + ToTensor(), + Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)), + ]) + + +class BLIPScore(nn.Module): + def __init__(self, pathname, med_config, device='cpu'): + super().__init__() + self.device = device + + self.preprocess = _transform(224) + self.blip = BLIP_Pretrain(image_size=224, vit='large', med_config=med_config) + + state_dict = torch.load(pathname, map_location='cpu') + self.load_state_dict(state_dict, strict=False) + self.to(self.device) + + def score(self, prompt, image): + + if (type(image).__name__=='list'): + _, rewards = self.inference_rank(prompt, image) + return rewards + + # text encode + text_input = self.blip.tokenizer(prompt, padding='max_length', truncation=True, max_length=35, return_tensors="pt").to(self.device) + text_output = self.blip.text_encoder(text_input.input_ids, attention_mask = text_input.attention_mask, mode='text') + txt_feature = F.normalize(self.blip.text_proj(text_output.last_hidden_state[:,0,:])) + + # image encode + if isinstance(image, Image.Image): + pil_image = image + elif isinstance(image, str): + if os.path.isfile(image): + pil_image = Image.open(image) + image = self.preprocess(pil_image).unsqueeze(0).to(self.device) + image_embeds = self.blip.visual_encoder(image) + image_features = F.normalize(self.blip.vision_proj(image_embeds[:,0,:]), dim=-1) + + # score + rewards = torch.sum(torch.mul(txt_feature, image_features), dim=1, keepdim=True) + + score = rewards.detach().cpu().numpy().item() + score += 2.5 + score *= 2 + if score < 0: + score = 0 + if score > 10: + score = 10 + return score + + + def inference_rank(self, prompt, generations_list): + + text_input = self.blip.tokenizer(prompt, padding='max_length', truncation=True, max_length=35, return_tensors="pt").to(self.device) + text_output = self.blip.text_encoder(text_input.input_ids, attention_mask = text_input.attention_mask, mode='text') + txt_feature = F.normalize(self.blip.text_proj(text_output.last_hidden_state[:,0,:])) + + txt_set = [] + img_set = [] + for generations in generations_list: + # image encode + img_path = generations + pil_image = Image.open(img_path) + image = self.preprocess(pil_image).unsqueeze(0).to(self.device) + image_embeds = self.blip.visual_encoder(image) + image_features = F.normalize(self.blip.vision_proj(image_embeds[:,0,:]), dim=-1) + img_set.append(image_features) + txt_set.append(txt_feature) + + txt_features = torch.cat(txt_set, 0).float() # [image_num, feature_dim] + img_features = torch.cat(img_set, 0).float() # [image_num, feature_dim] + rewards = torch.sum(torch.mul(txt_features, img_features), dim=1, keepdim=True) + rewards = torch.squeeze(rewards) + _, rank = torch.sort(rewards, dim=0, descending=True) + _, indices = torch.sort(rank, dim=0) + indices = indices + 1 + + return indices.detach().cpu().numpy().tolist(), rewards.detach().cpu().numpy().tolist() \ No newline at end of file diff --git a/sd_webui_bayesian_merger/models/CLIPScore.py b/sd_webui_bayesian_merger/models/CLIPScore.py new file mode 100644 index 0000000..c2d8f2c --- /dev/null +++ b/sd_webui_bayesian_merger/models/CLIPScore.py @@ -0,0 +1,102 @@ +''' +@File : CLIPScore.py +@Time : 2023/02/12 13:14:00 +@Auther : Jiazheng Xu +@Contact : xjz22@mails.tsinghua.edu.cn +@Description: CLIPScore. +* Based on CLIP code base +* https://github.com/openai/CLIP +''' +import os + +import torch +import torch.nn as nn +import torch.nn.functional as F +from PIL import Image +import clip + + +class CLIPScore(nn.Module): + def __init__(self, pathname, device='cpu'): + super().__init__() + self.device = device + self.clip_model, self.preprocess = clip.load(pathname, device=self.device, jit=False) + + if device == "cpu": + self.clip_model.float() + else: + clip.model.convert_weights( + self.clip_model) # Actually this line is unnecessary since clip by default already on float16 + + # have clip.logit_scale require no grad. + self.clip_model.logit_scale.requires_grad_(False) + + def score(self, prompt, image): + + if (type(image).__name__ == 'list'): + _, rewards = self.inference_rank(prompt, image) + return rewards + + # text encode + text = clip.tokenize(prompt, truncate=True).to(self.device) + txt_features = F.normalize(self.clip_model.encode_text(text)) + + # image encode + if isinstance(image, Image.Image): + pil_image = image + elif isinstance(image, str): + if os.path.isfile(image): + pil_image = Image.open(image) + image = self.preprocess(pil_image).unsqueeze(0).to(self.device) + image_features = F.normalize(self.clip_model.encode_image(image)) + + # score + rewards = torch.sum(torch.mul(txt_features, image_features), dim=1, keepdim=True) + + score = rewards.detach().cpu().numpy().item() + score += 1 + score *= 5 + return score + + def inference_rank(self, prompt, generations_list): + + text = clip.tokenize(prompt, truncate=True).to(self.device) + txt_feature = F.normalize(self.clip_model.encode_text(text)) + + txt_set = [] + img_set = [] + for generations in generations_list: + # image encode + img_path = generations + pil_image = Image.open(img_path) + image = self.preprocess(pil_image).unsqueeze(0).to(self.device) + image_features = F.normalize(self.clip_model.encode_image(image)) + img_set.append(image_features) + txt_set.append(txt_feature) + + txt_features = torch.cat(txt_set, 0).float() # [image_num, feature_dim] + img_features = torch.cat(img_set, 0).float() # [image_num, feature_dim] + rewards = torch.sum(torch.mul(txt_features, img_features), dim=1, keepdim=True) + rewards = torch.squeeze(rewards) + _, rank = torch.sort(rewards, dim=0, descending=True) + _, indices = torch.sort(rank, dim=0) + indices = indices + 1 + + return indices.detach().cpu().numpy().tolist(), rewards.detach().cpu().numpy().tolist() + + def features(self, prompt, image, aes_type='v2'): + + # text encode + text = clip.tokenize(prompt, truncate=True).to(self.device) + txt_features = F.normalize(self.clip_model.encode_text(text)) + + # image encode + if isinstance(image, Image.Image): + pil_image = image + elif isinstance(image, str): + if os.path.isfile(image): + pil_image = Image.open(image) + image = self.preprocess(pil_image).unsqueeze(0).to(self.device) + image_features = F.normalize(self.clip_model.encode_image(image)) + + return txt_features, image_features diff --git a/sd_webui_bayesian_merger/models/CafeScore.py b/sd_webui_bayesian_merger/models/CafeScore.py new file mode 100644 index 0000000..b54e30f --- /dev/null +++ b/sd_webui_bayesian_merger/models/CafeScore.py @@ -0,0 +1,37 @@ +import os +import safetensors +import torch +from PIL import Image +from transformers import pipeline, AutoConfig, AutoProcessor, BeitForImageClassification + + +class CafeScore: + def __init__(self, pathname, device='cpu'): + super().__init__() + self.tokenizer = None + self.pipe = None + self.device = device + if self.device == 'cuda': + self.device += ':0' + self.pathname = pathname + self.initialize_model() + + def initialize_model(self): + statedict = safetensors.torch.load_file(self.pathname) + config_pick = AutoConfig.from_pretrained(pretrained_model_name_or_path="cafeai/cafe_aesthetic") + model = BeitForImageClassification.from_pretrained(pretrained_model_name_or_path=None, state_dict=statedict, config=config_pick) + processor = AutoProcessor.from_pretrained(pretrained_model_name_or_path="cafeai/cafe_aesthetic") + self.pipe = pipeline("image-classification", model=model, image_processor=processor, device=self.device) + + def score(self, prompt, image): + if isinstance(image, Image.Image): + pil_image = image + elif isinstance(image, str): + if os.path.isfile(image): + pil_image = Image.open(image) + + score = self.pipe(images=[pil_image], top_k=2)[0] + score = [p for p in score if p['label'] == 'aesthetic'][0]['score'] + score *= 10 + + return score diff --git a/sd_webui_bayesian_merger/models/HPSv2.py b/sd_webui_bayesian_merger/models/HPSv2.py new file mode 100644 index 0000000..b7ba218 --- /dev/null +++ b/sd_webui_bayesian_merger/models/HPSv2.py @@ -0,0 +1,70 @@ +import os +import open_clip +import torch +from PIL import Image +from open_clip import image_transform + + +class HPSv2: + def __init__(self, pathname, device='cpu'): + super().__init__() + self.tokenizer = None + self.model_dict = {} + self.device = device + self.pathname = pathname + self.initialize_model() + + def initialize_model(self): + if not self.model_dict: + model, preprocess_train, preprocess_val = open_clip.create_model_and_transforms( + 'ViT-H-14', + pretrained=str(self.pathname), + precision='amp', + device=self.device, + jit=False, + force_quick_gelu=False, + force_custom_text=False, + force_patch_dropout=False, + force_image_size=None, + pretrained_image=False, + image_mean=None, + image_std=None, + aug_cfg={}, + output_dict=True, + ) + preprocess_val = image_transform( + model.visual.image_size, + is_train=False, + mean=None, + std=None, + resize_longest_max=True, + ) + + self.model_dict['model'] = model + self.model_dict['preprocess_val'] = preprocess_val + + self.tokenizer = open_clip.get_tokenizer('ViT-H-14') + self.model_dict['model'] = model.to(self.device) + self.model_dict['model'].eval() + + def score(self, prompt, image): + preprocess_val = self.model_dict['preprocess_val'] + model = self.model_dict['model'] + if isinstance(image, Image.Image): + pil_image = image + elif isinstance(image, str): + if os.path.isfile(image): + pil_image = Image.open(image) + + image = preprocess_val(pil_image).unsqueeze(0).to(device=self.device, non_blocking=True) + with torch.no_grad(): + text = self.tokenizer([prompt]).to(device=self.device, non_blocking=True) + with torch.cuda.amp.autocast(): + outputs = model(image, text) + image_features, text_features = outputs["image_features"], outputs["text_features"] + + scores = torch.sum(torch.mul(image_features, text_features), dim=1, keepdim=True) + score = scores.cpu().tolist()[0][0] + score += 1 + score *= 5 + return score diff --git a/sd_webui_bayesian_merger/models/ImageReward.py b/sd_webui_bayesian_merger/models/ImageReward.py new file mode 100644 index 0000000..0a40556 --- /dev/null +++ b/sd_webui_bayesian_merger/models/ImageReward.py @@ -0,0 +1,186 @@ +''' +@File : ImageReward.py +@Time : 2023/01/28 19:53:00 +@Auther : Jiazheng Xu +@Contact : xjz22@mails.tsinghua.edu.cn +@Description: ImageReward Reward model. +* Based on CLIP code base and improved-aesthetic-predictor code base +* https://github.com/openai/CLIP +* https://github.com/christophschuhmann/improved-aesthetic-predictor +''' + +import os +import torch +import torch.nn as nn +from PIL import Image +from sd_webui_bayesian_merger.models.BLIP.blip_pretrain import BLIP_Pretrain +from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize + +try: + from torchvision.transforms import InterpolationMode + BICUBIC = InterpolationMode.BICUBIC +except ImportError: + BICUBIC = Image.BICUBIC + + +def _convert_image_to_rgb(image): + return image.convert("RGB") + + +def _transform(n_px): + return Compose([ + Resize(n_px, interpolation=BICUBIC), + CenterCrop(n_px), + _convert_image_to_rgb, + ToTensor(), + Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)), + ]) + + +class MLP(nn.Module): + def __init__(self, input_size): + super().__init__() + self.input_size = input_size + + self.layers = nn.Sequential( + nn.Linear(self.input_size, 1024), + #nn.ReLU(), + nn.Dropout(0.2), + nn.Linear(1024, 128), + #nn.ReLU(), + nn.Dropout(0.2), + nn.Linear(128, 64), + #nn.ReLU(), + nn.Dropout(0.1), + nn.Linear(64, 16), + #nn.ReLU(), + nn.Linear(16, 1) + ) + + # initial MLP param + for name, param in self.layers.named_parameters(): + if 'weight' in name: + nn.init.normal_(param, mean=0.0, std=1.0/(self.input_size+1)) + if 'bias' in name: + nn.init.constant_(param, val=0) + + def forward(self, input): + return self.layers(input) + + +class ImageReward(nn.Module): + def __init__(self, pathname, med_config, device='cpu'): + super().__init__() + self.device = device + + self.blip = BLIP_Pretrain(image_size=224, vit='large', med_config=med_config) + self.preprocess = _transform(224) + self.mlp = MLP(768) + + state_dict = torch.load(pathname, map_location='cpu') + self.load_state_dict(state_dict, strict=False) + self.to(self.device) + + self.mean = 0.16717362830052426 + self.std = 1.0333394966054072 + + + def score_gard(self, prompt_ids, prompt_attention_mask, image): + + image_embeds = self.blip.visual_encoder(image) + # text encode cross attention with image + image_atts = torch.ones(image_embeds.size()[:-1],dtype=torch.long).to(self.device) + text_output = self.blip.text_encoder(prompt_ids, + attention_mask = prompt_attention_mask, + encoder_hidden_states = image_embeds, + encoder_attention_mask = image_atts, + return_dict = True, + ) + + txt_features = text_output.last_hidden_state[:,0,:] # (feature_dim) + rewards = self.mlp(txt_features) + rewards = (rewards - self.mean) / self.std + + return rewards + + + def score(self, prompt, image): + + if (type(image).__name__=='list'): + _, rewards = self.inference_rank(prompt, image) + return rewards + + # text encode + text_input = self.blip.tokenizer(prompt, padding='max_length', truncation=True, max_length=35, return_tensors="pt").to(self.device) + + # image encode + if isinstance(image, Image.Image): + pil_image = image + elif isinstance(image, str): + if os.path.isfile(image): + pil_image = Image.open(image) + else: + raise TypeError(r'This image parameter type has not been supportted yet. Please pass PIL.Image or file path str.') + + image = self.preprocess(pil_image).unsqueeze(0).to(self.device) + image_embeds = self.blip.visual_encoder(image) + + # text encode cross attention with image + image_atts = torch.ones(image_embeds.size()[:-1],dtype=torch.long).to(self.device) + text_output = self.blip.text_encoder(text_input.input_ids, + attention_mask = text_input.attention_mask, + encoder_hidden_states = image_embeds, + encoder_attention_mask = image_atts, + return_dict = True, + ) + + txt_features = text_output.last_hidden_state[:,0,:].float() # (feature_dim) + rewards = self.mlp(txt_features) + rewards = (rewards - self.mean) / self.std + + score = rewards.detach().cpu().numpy().item() + score += 2.5 + score *= 2 + if score < 0: + score = 0 + if score > 10: + score = 10 + return score + + + def inference_rank(self, prompt, generations_list): + + text_input = self.blip.tokenizer(prompt, padding='max_length', truncation=True, max_length=35, return_tensors="pt").to(self.device) + + txt_set = [] + for generation in generations_list: + # image encode + if isinstance(generation, Image.Image): + pil_image = generation + elif isinstance(generation, str): + if os.path.isfile(generation): + pil_image = Image.open(generation) + else: + raise TypeError(r'This image parameter type has not been supportted yet. Please pass PIL.Image or file path str.') + image = self.preprocess(pil_image).unsqueeze(0).to(self.device) + image_embeds = self.blip.visual_encoder(image) + + # text encode cross attention with image + image_atts = torch.ones(image_embeds.size()[:-1],dtype=torch.long).to(self.device) + text_output = self.blip.text_encoder(text_input.input_ids, + attention_mask = text_input.attention_mask, + encoder_hidden_states = image_embeds, + encoder_attention_mask = image_atts, + return_dict = True, + ) + txt_set.append(text_output.last_hidden_state[:,0,:]) + + txt_features = torch.cat(txt_set, 0).float() # [image_num, feature_dim] + rewards = self.mlp(txt_features) # [image_num, 1] + rewards = (rewards - self.mean) / self.std + rewards = torch.squeeze(rewards) + _, rank = torch.sort(rewards, dim=0, descending=True) + _, indices = torch.sort(rank, dim=0) + indices = indices + 1 + + return indices.detach().cpu().numpy().tolist(), rewards.detach().cpu().numpy().tolist() \ No newline at end of file diff --git a/sd_webui_bayesian_merger/models/Laion.py b/sd_webui_bayesian_merger/models/Laion.py new file mode 100644 index 0000000..70f1057 --- /dev/null +++ b/sd_webui_bayesian_merger/models/Laion.py @@ -0,0 +1,102 @@ +""" +@File : Laion.py +@Time : 2023/02/12 14:54:00 +@Auther : Jiazheng Xu +@Contact : xjz22@mails.tsinghua.edu.cn +@Description: AestheticScore. +* Based on improved-aesthetic-predictor code base +* https://github.com/christophschuhmann/improved-aesthetic-predictor +""" +import os + +import open_clip +import torch +import torch.nn as nn +import torch.nn.functional as F +from PIL import Image +import clip + +class MLP(nn.Module): + def __init__(self, input_size): + super().__init__() + self.input_size = input_size + self.layers = nn.Sequential( + nn.Linear(self.input_size, 1024), + # nn.ReLU(), + nn.Dropout(0.2), + nn.Linear(1024, 128), + # nn.ReLU(), + nn.Dropout(0.2), + nn.Linear(128, 64), + # nn.ReLU(), + nn.Dropout(0.1), + + nn.Linear(64, 16), + # nn.ReLU(), + + nn.Linear(16, 1) + ) + + def forward(self, x): + return self.layers(x) + + +class Laion(nn.Module): + def __init__(self, pathname, clip_path, device): + super().__init__() + self.device = device + self.clip_model, self.preprocess = clip.load(clip_path, device=self.device, jit=False) + self.mlp = MLP(768) + state_dict = torch.load(pathname, map_location='cpu') + self.mlp.load_state_dict(state_dict, strict=False) + self.mlp = self.mlp.to(self.device) + self.mlp.eval() + + if device == "cpu": + self.clip_model.float() + else: + clip.model.convert_weights( + self.clip_model) # Actually this line is unnecessary since clip by default already on float16 + + # have clip.logit_scale require no grad. + self.clip_model.logit_scale.requires_grad_(False) + + def score(self, prompt, image): + + if (type(image).__name__ == 'list'): + _, rewards = self.inference_rank(prompt, image) + return rewards + # image encode + if isinstance(image, Image.Image): + pil_image = image + elif isinstance(image, str): + if os.path.isfile(image): + pil_image = Image.open(image) + image = self.preprocess(pil_image).unsqueeze(0).to(self.device) + image_features = F.normalize(self.clip_model.encode_image(image)).float() + + # score + with torch.no_grad(): + rewards = self.mlp(image_features) + + return rewards.detach().cpu().numpy().item() + + def inference_rank(self, prompt, generations_list): + + img_set = [] + for generations in generations_list: + # image encode + img_path = generations + pil_image = Image.open(img_path) + image = self.preprocess(pil_image).unsqueeze(0).to(self.device) + image_features = F.normalize(self.clip_model.encode_image(image)) + img_set.append(image_features) + + img_features = torch.cat(img_set, 0).float() # [image_num, feature_dim] + rewards = self.mlp(img_features) + rewards = torch.squeeze(rewards) + _, rank = torch.sort(rewards, dim=0, descending=True) + _, indices = torch.sort(rank, dim=0) + indices = indices + 1 + + return indices.detach().cpu().numpy().tolist(), rewards.detach().cpu().numpy().tolist() diff --git a/sd_webui_bayesian_merger/models/NoAIScore.py b/sd_webui_bayesian_merger/models/NoAIScore.py new file mode 100644 index 0000000..1f52d08 --- /dev/null +++ b/sd_webui_bayesian_merger/models/NoAIScore.py @@ -0,0 +1,110 @@ +import os +import safetensors +import torch +from torch import nn +from torch.nn import functional as F +import pytorch_lightning as pl +from pytorch_lightning.core.mixins import HyperparametersMixin +from PIL import Image +from transformers import pipeline, AutoConfig, AutoProcessor, BeitForImageClassification +import timm + + +class SyntheticModel(pl.LightningModule, HyperparametersMixin): + def __init__(self): + super().__init__() + self.model = timm.create_model('convnext_large_mlp.clip_laion2b_soup_ft_in12k_in1k_384', + pretrained=False, + num_classes=0) + + self.clf = nn.Sequential( + nn.Linear(1536, 128), + nn.ReLU(inplace=True), + nn.Linear(128, 2)) + + def forward(self, image): + image_features = self.model(image) + return self.clf(image_features) + + +class NoAIScore: + def __init__(self, class_path, real_path, anime_path, device='cpu'): + super().__init__() + self.transform_m = None + self.model_class = None + self.model_real = None + self.model_anime = None + self.class_path = class_path + self.real_path = real_path + self.anime_path = anime_path + self.device = device + if self.device == 'cuda': + self.device += ':0' + self.initialize_model() + + def initialize_model(self): + statedict = safetensors.torch.load_file(self.class_path) + config = AutoConfig.from_pretrained(pretrained_model_name_or_path="cafeai/cafe_style") + model = BeitForImageClassification.from_pretrained(pretrained_model_name_or_path=None, state_dict=statedict, + config=config) + processor = AutoProcessor.from_pretrained(pretrained_model_name_or_path="cafeai/cafe_style") + self.model_class = pipeline("image-classification", model=model, image_processor=processor, + device=self.device) + + statedict = safetensors.torch.load_file(self.anime_path) + config = AutoConfig.from_pretrained(pretrained_model_name_or_path="saltacc/anime-ai-detect") + model = BeitForImageClassification.from_pretrained(pretrained_model_name_or_path=None, state_dict=statedict, + config=config) + processor = AutoProcessor.from_pretrained(pretrained_model_name_or_path="saltacc/anime-ai-detect") + self.model_anime = pipeline("image-classification", model=model, image_processor=processor, + device=self.device) + + self.model_real = SyntheticModel() + statedict = torch.load(self.real_path, map_location='cpu') + self.model_real.load_state_dict(statedict) + self.model_real = self.model_real.to(self.device) + self.model_real.eval() + + transform_config = {'input_size': (3, 384, 384), + 'interpolation': 'bicubic', + 'mean': (0.48145466, 0.4578275, 0.40821073), + 'std': (0.26862954, 0.26130258, 0.27577711), + 'crop_pct': 1.0, + 'crop_mode': 'squash'} + + self.transform_m = timm.data.create_transform(**transform_config, is_training=False) + + def score(self, prompt, image): + if isinstance(image, Image.Image): + pil_image = image + elif isinstance(image, str): + if os.path.isfile(image): + pil_image = Image.open(image) + + tmp = self.model_class(images=[pil_image], top_k=5)[0] + anime_prob = 0 + anime_prob += [p for p in tmp if p['label'] == 'anime'][0]['score'] + anime_prob += [p for p in tmp if p['label'] == '3d'][0]['score'] + anime_prob += [p for p in tmp if p['label'] == 'manga_like'][0]['score'] + anime_prob += [p for p in tmp if p['label'] == 'other'][0]['score'] / 2 + + + real_prob = 0 + real_prob += [p for p in tmp if p['label'] == 'real_life'][0]['score'] + real_prob += [p for p in tmp if p['label'] == 'other'][0]['score'] / 2 + + + tmp = self.model_anime(images=[pil_image], top_k=5)[0] + anime_ai_score = [p for p in tmp if p['label'] == 'human'][0]['score'] + + tmp = self.transform_m(pil_image) + tmp = self.model_real.forward(tmp.unsqueeze(0).to(self.device)) + + y_1 = F.softmax(tmp, dim=1)[:, 1].cpu().detach().numpy() + y_2 = F.softmax(tmp, dim=1)[:, 0].cpu().detach().numpy() + + real_ai_score = y_2.tolist()[0] + + score = (real_prob * real_ai_score + anime_prob * anime_ai_score) * 10 + + return score diff --git a/sd_webui_bayesian_merger/models/PickScore.py b/sd_webui_bayesian_merger/models/PickScore.py new file mode 100644 index 0000000..e6a8a29 --- /dev/null +++ b/sd_webui_bayesian_merger/models/PickScore.py @@ -0,0 +1,72 @@ +import os +import open_clip +import safetensors +import torch +from PIL import Image +from transformers import AutoModel, AutoProcessor, AutoConfig + + +class PickScore: + def __init__(self, pathname, device='cpu'): + super().__init__() + self.tokenizer = None + self.model_dict = {} + self.device = device + self.pathname = pathname + self.initialize_model() + + def initialize_model(self): + if not self.model_dict: + statedict = safetensors.torch.load_file(self.pathname) + config_pick = AutoConfig.from_pretrained(pretrained_model_name_or_path="yuvalkirstain/PickScore_v1") + model = AutoModel.from_pretrained(pretrained_model_name_or_path=None, state_dict=statedict, + config=config_pick) + preprocess_val = AutoProcessor.from_pretrained( + pretrained_model_name_or_path="laion/CLIP-ViT-H-14-laion2B-s32B-b79K") + + self.model_dict['model'] = model + self.model_dict['preprocess_val'] = preprocess_val + + self.tokenizer = open_clip.get_tokenizer('ViT-H-14') + self.model_dict['model'] = model.to(self.device) + self.model_dict['model'].eval() + + def score(self, prompt, image): + preprocess_val = self.model_dict['preprocess_val'] + model = self.model_dict['model'] + if isinstance(image, Image.Image): + pil_image = image + elif isinstance(image, str): + if os.path.isfile(image): + pil_image = Image.open(image) + + image_inputs = preprocess_val( + images=pil_image, + padding=True, + truncation=True, + max_length=77, + return_tensors="pt", + ).to(self.device) + + text_inputs = preprocess_val( + text=prompt, + padding=True, + truncation=True, + max_length=77, + return_tensors="pt", + ).to(self.device) + + with torch.no_grad(): + # embed + image_embs = model.get_image_features(**image_inputs) + image_embs = image_embs / torch.norm(image_embs, dim=-1, keepdim=True) + + text_embs = model.get_text_features(**text_inputs) + text_embs = text_embs / torch.norm(text_embs, dim=-1, keepdim=True) + + scores = torch.sum(torch.mul(text_embs, image_embs), dim=1, keepdim=True) + + score = scores.cpu().tolist()[0][0] + score += 1 + score *= 5 + return score diff --git a/sd_webui_bayesian_merger/models/ShadowScore.py b/sd_webui_bayesian_merger/models/ShadowScore.py new file mode 100644 index 0000000..668889b --- /dev/null +++ b/sd_webui_bayesian_merger/models/ShadowScore.py @@ -0,0 +1,37 @@ +import os +import safetensors +import torch +from PIL import Image +from transformers import pipeline, AutoConfig, AutoProcessor, ViTForImageClassification + + +class ShadowScore: + def __init__(self, pathname, device='cpu'): + super().__init__() + self.tokenizer = None + self.pipe = None + self.device = device + if self.device == 'cuda': + self.device += ':0' + self.pathname = pathname + self.initialize_model() + + def initialize_model(self): + statedict = safetensors.torch.load_file(self.pathname) + config = AutoConfig.from_pretrained(pretrained_model_name_or_path="shadowlilac/aesthetic-shadow") + model = ViTForImageClassification.from_pretrained(pretrained_model_name_or_path=None, state_dict=statedict, config=config) + processor = AutoProcessor.from_pretrained(pretrained_model_name_or_path="shadowlilac/aesthetic-shadow") + self.pipe = pipeline("image-classification", model=model, image_processor=processor, device=self.device) + + def score(self, prompt, image): + if isinstance(image, Image.Image): + pil_image = image + elif isinstance(image, str): + if os.path.isfile(image): + pil_image = Image.open(image) + + score = self.pipe(images=[pil_image])[0] + score = [p for p in score if p['label'] == 'hq'][0]['score'] + score *= 10 + + return score diff --git a/sd_webui_bayesian_merger/models/WDAes.py b/sd_webui_bayesian_merger/models/WDAes.py new file mode 100644 index 0000000..cd30ced --- /dev/null +++ b/sd_webui_bayesian_merger/models/WDAes.py @@ -0,0 +1,94 @@ +import os + +import safetensors +import torch +import torch.nn as nn +import torch.nn.functional as F +from PIL import Image +from transformers import CLIPModel, CLIPConfig, CLIPImageProcessor +import numpy as np + + +class Classifier(torch.nn.Module): + def __init__(self, input_size, hidden_size, output_size): + super().__init__() + self.fc1 = torch.nn.Linear(input_size, hidden_size) + self.fc2 = torch.nn.Linear(hidden_size, hidden_size // 2) + self.fc3 = torch.nn.Linear(hidden_size // 2, output_size) + self.relu = torch.nn.ReLU() + self.sigmoid = torch.nn.Sigmoid() + + def forward(self, x): + x = self.fc1(x) + x = self.relu(x) + x = self.fc2(x) + x = self.relu(x) + x = self.fc3(x) + x = self.sigmoid(x) + return x + + +class WDAes(nn.Module): + def __init__(self, pathname, clip_path, device='cpu'): + super().__init__() + self.device = device + self.preprocess = CLIPImageProcessor.from_pretrained('openai/clip-vit-base-patch32') + config = CLIPConfig.from_pretrained(pretrained_model_name_or_path="openai/clip-vit-base-patch32") + state_dict = safetensors.torch.load_file(clip_path) + self.clip_model = CLIPModel.from_pretrained(pretrained_model_name_or_path=None, state_dict=state_dict, config=config) + self.clip_model = self.clip_model.to(self.device) + self.clip_model.eval() + self.mlp = Classifier(512, 256, 1) + state_dict = torch.load(pathname, map_location='cpu') + self.mlp.load_state_dict(state_dict, strict=False) + self.mlp = self.mlp.to('cpu') + self.mlp.eval() + + if self.device == "cpu": + self.clip_model.float() + + # have clip.logit_scale require no grad. + self.clip_model.logit_scale.requires_grad_(False) + + def score(self, prompt, image): + + if (type(image).__name__ == 'list'): + _, rewards = self.inference_rank(prompt, image) + return rewards + + with torch.no_grad(): + # image encode + if isinstance(image, Image.Image): + pil_image = image + elif isinstance(image, str): + if os.path.isfile(image): + pil_image = Image.open(image) + image = self.preprocess(images=pil_image, return_tensors='pt')['pixel_values'] + image = image.to(self.device) + image_features = self.clip_model.get_image_features(pixel_values=image).cpu().detach().numpy() + + rewards = (image_features / np.linalg.norm(image_features)).squeeze(axis=0) + reward = self.mlp(torch.from_numpy(rewards)).float().item() + + reward = reward * 10 + return reward + + def inference_rank(self, prompt, generations_list): + + img_set = [] + for generations in generations_list: + # image encode + img_path = generations + pil_image = Image.open(img_path) + image = self.preprocess(pil_image).unsqueeze(0).to(self.device) + image_features = F.normalize(self.clip_model.encode_image(image)) + img_set.append(image_features) + + img_features = torch.cat(img_set, 0).float() # [image_num, feature_dim] + rewards = self.mlp(img_features) + rewards = torch.squeeze(rewards) + _, rank = torch.sort(rewards, dim=0, descending=True) + _, indices = torch.sort(rank, dim=0) + indices = indices + 1 + + return indices.detach().cpu().numpy().tolist(), rewards.detach().cpu().numpy().tolist() diff --git a/sd_webui_bayesian_merger/models/__init__.py b/sd_webui_bayesian_merger/models/__init__.py new file mode 100644 index 0000000..5496e68 --- /dev/null +++ b/sd_webui_bayesian_merger/models/__init__.py @@ -0,0 +1,4 @@ +from .Laion import * +from .BLIPScore import * +from .CLIPScore import * +from .BLIP import * \ No newline at end of file diff --git a/sd_webui_bayesian_merger/optimiser.py b/sd_webui_bayesian_merger/optimiser.py index 82301c0..48abb42 100644 --- a/sd_webui_bayesian_merger/optimiser.py +++ b/sd_webui_bayesian_merger/optimiser.py @@ -4,6 +4,7 @@ from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Tuple +import torch from bayes_opt.logger import JSONLogger from hydra.core.hydra_config import HydraConfig @@ -30,7 +31,7 @@ def __post_init__(self) -> None: self.generator = Generator(self.cfg.url, self.cfg.batch_size) self.merger = Merger(self.cfg) self.start_logging() - self.scorer = AestheticScorer(self.cfg) + self.scorer = AestheticScorer(self.cfg, {}, {}, {}) self.prompter = Prompter(self.cfg) self.iteration = 0 @@ -91,7 +92,7 @@ def print_iteration_info(iteration_type: str): images, gen_paths, payloads = self.generate_images() scores, norm = self.score_images(images, gen_paths, payloads) - avg_score = self.scorer.average_score(scores, norm) + avg_score = self.scorer.average_calc(scores, norm, self.cfg.img_average_type) self.update_best_score(bases, weights, avg_score) return avg_score diff --git a/sd_webui_bayesian_merger/scorer.py b/sd_webui_bayesian_merger/scorer.py index 0c3e07b..6572805 100644 --- a/sd_webui_bayesian_merger/scorer.py +++ b/sd_webui_bayesian_merger/scorer.py @@ -4,51 +4,105 @@ from pathlib import Path from typing import Dict, List -import clip import requests -import safetensors.torch import torch -import torch.nn as nn from hydra.core.hydra_config import HydraConfig -from omegaconf import DictConfig +from omegaconf import DictConfig, open_dict from PIL import Image, PngImagePlugin +from sd_webui_bayesian_merger.models.Laion import Laion as AES +from sd_webui_bayesian_merger.models.ImageReward import ImageReward as IMGR +from sd_webui_bayesian_merger.models.CLIPScore import CLIPScore as CLP +from sd_webui_bayesian_merger.models.BLIPScore import BLIPScore as BLP +from sd_webui_bayesian_merger.models.HPSv2 import HPSv2 as HPS +from sd_webui_bayesian_merger.models.PickScore import PickScore as PICK +from sd_webui_bayesian_merger.models.WDAes import WDAes as WDA +from sd_webui_bayesian_merger.models.ShadowScore import ShadowScore as SS +from sd_webui_bayesian_merger.models.CafeScore import CafeScore as CAFE +from sd_webui_bayesian_merger.models.NoAIScore import NoAIScore as NOAI LAION_URL = ( - "https://github.com/Xerxemi/sdweb-auto-MBW/blob/master/scripts/classifiers/laion/" + "https://github.com/grexzen/SD-Chad/blob/main/sac+logos+ava1-l14-linearMSE.pth?raw=true" ) - CHAD_URL = ( - "https://github.com/christophschuhmann/improved-aesthetic-predictor/blob/main/" + "https://github.com/grexzen/SD-Chad/blob/main/chadscorer.pth?raw=true" +) +WDAES_URL = ( + "https://huggingface.co/hakurei/waifu-diffusion-v1-4/resolve/main/models/aes-B32-v0.pth?download=true" +) +IR_URL = ( + "https://huggingface.co/THUDM/ImageReward/resolve/main/ImageReward.pt?download=true" +) +CLIP_URL = ( + "https://openaipublic.azureedge.net/clip/models/b8cca3fd41ae0c99ba7e8951adf17d267cdb84cd88be6f7c2e0eca1737a03836/ViT-L-14.pt?raw=true" +) +BLIP_URL = ( + "https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_large.pth?raw=true" +) +HPSV2_URL = ( + "https://huggingface.co/xswu/HPSv2/resolve/main/HPS_v2.1_compressed.pt?download=true" +) +PICK_URL = ( + "https://huggingface.co/yuvalkirstain/PickScore_v1/resolve/main/model.safetensors?download=true" +) +SHADOW_URL = ( + "https://huggingface.co/shadowlilac/aesthetic-shadow/resolve/main/model.safetensors?download=true" +) +CAFE_URL = ( + "https://huggingface.co/cafeai/cafe_aesthetic/resolve/3bca27c5c0b6021056b1e84e5a18cf1db9fe5d4c/model.safetensors?download=true" +) +CLASS_URL = ( + "https://huggingface.co/cafeai/cafe_style/resolve/d5ae1a7ac05a12ab84732c25f2ea7225d35ac81b/model.safetensors?download=true" +) +REAL_URL = ( + "https://huggingface.co/Sumsub/Sumsub-ffs-synthetic-2.0/resolve/main/synthetic.pt?download=true" +) +ANIME_URL = ( + "https://huggingface.co/saltacc/anime-ai-detect/resolve/e175bb6b5e19cda40bc6c9ad85b138ee7c7ce23a/model.safetensors?download=true" ) -printWSLFlag = 0 - - -class AestheticPredictor(nn.Module): - def __init__(self, input_size): - super().__init__() - self.input_size = input_size - self.layers = nn.Sequential( - nn.Linear(self.input_size, 1024), - nn.Dropout(0.2), - nn.Linear(1024, 128), - nn.Dropout(0.2), - nn.Linear(128, 64), - nn.Dropout(0.1), - nn.Linear(64, 16), - nn.Linear(16, 1), - ) +LAION_MODEL = ( + "Laion.pth" +) +CHAD_MODEL = ( + "Chad.pth" +) +WDAES_MODEL = ( + "WD_Aes.pth" +) +IR_MODEL = ( + "ImageReward.pt" +) +CLIP_MODEL = ( + "CLIP-ViT-L-14.pt" +) +BLIP_MODEL = ( + "BLIP_Large.pth" +) +HPSV2_MODEL = ( + "HPS_v2.1.pt" +) +PICK_MODEL = ( + "Pick-A-Pic.safetensors" +) +SHADOW_MODEL = ( + "Shadow.safetensors" +) +CAFE_MODEL = ( + "Cafe.safetensors" +) - def forward(self, x): - return self.layers(x) +printWSLFlag = 0 @dataclass class AestheticScorer: cfg: DictConfig + scorer_model_name: Dict + model_path: Dict + model: Dict def __post_init__(self): - if self.cfg.scorer_method == "manual": + if "manual" in self.cfg.scorer_method: self.cfg.save_imgs = True if self.cfg.save_imgs: @@ -56,118 +110,195 @@ def __post_init__(self): if not self.imgs_dir.exists(): self.imgs_dir.mkdir() - if self.cfg.scorer_method == "manual": - return + for evaluator in self.cfg.scorer_method: + if evaluator != 'manual': + if evaluator != 'noai': + if self.cfg.scorer_alt_location is not None and evaluator in self.cfg.scorer_alt_location: + self.scorer_model_name[evaluator] = self.cfg.scorer_alt_location[evaluator]['model_name'] + self.model_path[evaluator] = Path(self.cfg.scorer_alt_location[evaluator]['model_dir']) + else: + self.scorer_model_name[evaluator] = eval(f"{evaluator.upper() + '_MODEL'}") + self.model_path[evaluator] = Path( + self.cfg.scorer_model_dir, + self.scorer_model_name[evaluator], + ) + else: + self.scorer_model_name[evaluator] = 'NOAI pipeline' + self.model_path[evaluator] = {} + self.model_path[evaluator]['class'] = Path( + self.cfg.scorer_model_dir, + "Class.safetensors", + ) + self.model_path[evaluator]['real'] = Path( + self.cfg.scorer_model_dir, + "Real.pt", + ) + self.model_path[evaluator]['anime'] = Path( + self.cfg.scorer_model_dir, + "Anime.safetensors", + ) - if self.cfg.scorer_method == "laion": - self.scorer_model_name = "laion-sac-logos-ava-v2.safetensors" - elif self.cfg.scorer_method == "chad": - self.scorer_model_name = "ava+logos-l14-linearMSE.pth" - self.model_path = Path( + with open_dict(self.cfg): + if self.cfg.scorer_device is None: + self.cfg.scorer_device = {} + if evaluator not in self.cfg.scorer_device: + self.cfg.scorer_device[evaluator] = self.cfg.scorer_default_device + with open_dict(self.cfg): + if self.cfg.scorer_weight is None: + self.cfg.scorer_weight = {} + if evaluator not in self.cfg.scorer_weight: + self.cfg.scorer_weight[evaluator] = 1 + if 'clip' not in self.cfg.scorer_method and any( + x in ['laion', 'chad'] for x in self.cfg.scorer_method): + self.model_path['clip'] = Path( + self.cfg.scorer_model_dir, + CLIP_MODEL, + ) + + self.get_models() + self.load_models() + + def get_models(self) -> None: + blip_config = Path( self.cfg.scorer_model_dir, - self.scorer_model_name, + 'med_config.json', ) - self.get_model() - self.load_model() + if not blip_config.is_file(): + url = "https://huggingface.co/THUDM/ImageReward/resolve/main/med_config.json?download=true" - def get_model(self) -> None: - if self.model_path.is_file(): - return + r = requests.get(url) + r.raise_for_status() - print("You do not have an aesthetic model ckpt, let me download that for you") - if self.cfg.scorer_method == "chad": - url = CHAD_URL - elif self.cfg.scorer_method == "laion": - url = LAION_URL + with open(blip_config.absolute(), "wb") as f: + print(f"saved into {blip_config}") + f.write(r.content) - url += f"{self.scorer_model_name}?raw=true" + for evaluator in self.cfg.scorer_method: + if evaluator != 'manual': + if evaluator != 'noai': + if not self.model_path[evaluator].is_file(): + print(f"You do not have the {evaluator.upper()} model, let me download that for you") + url = eval(f"{evaluator.upper() + '_URL'}") - r = requests.get(url) - r.raise_for_status() + r = requests.get(url) + r.raise_for_status() - with open(self.model_path.absolute(), "wb") as f: - print(f"saved into {self.model_path}") - f.write(r.content) + with open(self.model_path[evaluator].absolute(), "wb") as f: + print(f"saved into {self.model_path[evaluator]}") + f.write(r.content) + else: + for m_path in self.model_path[evaluator]: + if not self.model_path[evaluator][m_path].is_file(): + url = eval(f"{m_path.upper() + '_URL'}") - def load_model(self) -> None: - # return in manual mode - if self.cfg.scorer_method == "manual": - return - print(f"Loading {self.scorer_model_name}") + r = requests.get(url) + r.raise_for_status() - if self.cfg.scorer_method in ["chad", "laion"]: - self.model = AestheticPredictor(768).to(self.cfg.scorer_device).eval() + with open(self.model_path[evaluator][m_path].absolute(), "wb") as f: + print(f"saved into {self.model_path[evaluator][m_path]}") + f.write(r.content) - if self.model_path.suffix == ".safetensors": - self.model.load_state_dict( - safetensors.torch.load_file( - self.model_path, - ) - ) - self.model.to(self.cfg.scorer_device) - else: - self.model.load_state_dict( - torch.load( - self.model_path, - map_location=self.cfg.scorer_device, - ) - ) - self.model.eval() - self.load_clip() + if evaluator == 'wdaes': + clip_vit_b_32 = Path( + self.cfg.scorer_model_dir, + "CLIP-ViT-B-32.safetensors", + ) + if not clip_vit_b_32.is_file(): + print( + f"You do not have the CLIP-ViT-B-32 necessary for the wdaes model, let me download that for you") + url = "https://huggingface.co/openai/clip-vit-base-patch32/resolve/b527df4b30e5cc18bde1cc712833a741d2d8c362/model.safetensors?download=true" - def load_clip(self) -> None: - if self.cfg.scorer_method in ["chad", "laion"]: - self.clip_model_name = "ViT-L/14" + r = requests.get(url) + r.raise_for_status() - print(f"Loading {self.clip_model_name}") + with open(clip_vit_b_32.absolute(), "wb") as f: + print(f"saved into {clip_vit_b_32}") + f.write(r.content) - if self.cfg.scorer_method in ["chad", "laion"]: - self.clip_model, self.clip_preprocess = clip.load( - self.clip_model_name, - device=self.cfg.scorer_device, - ) + if ('clip' not in self.cfg.scorer_method and + any(x in ['laion', 'chad'] for x in self.cfg.scorer_method)): + if not self.model_path['clip'].is_file(): + print(f"You do not have the CLIP(which you need) model, let me download that for you") + url = CLIP_URL - def get_image_features(self, image: Image.Image) -> torch.Tensor: - if self.cfg.scorer_method in ["chad", "laion"]: - image = self.clip_preprocess(image).unsqueeze(0).to(self.cfg.scorer_device) - with torch.no_grad(): - image_features = self.clip_model.encode_image(image) - image_features /= image_features.norm(dim=-1, keepdim=True) - image_features = image_features.cpu().detach().numpy() - return image_features - - def score(self, image: Image.Image) -> float: - image_features = self.get_image_features(image) - score = self.model( - torch.from_numpy(image_features).to(self.cfg.scorer_device).float(), + r = requests.get(url) + r.raise_for_status() + + with open(self.model_path['clip'].absolute(), "wb") as f: + print(f"saved into {self.model_path['clip']}") + f.write(r.content) + + def load_models(self) -> None: + med_config = Path( + self.cfg.scorer_model_dir, + "med_config.json" ) + for evaluator in self.cfg.scorer_method: + if evaluator != 'manual': + print(f"Loading {self.scorer_model_name[evaluator]}") + if evaluator == 'wdaes': + clip_vit_b_32 = Path( + self.cfg.scorer_model_dir, + "CLIP-ViT-B-32.safetensors", + ) + self.model[evaluator] = WDA(self.model_path[evaluator], clip_vit_b_32, + self.cfg.scorer_device[evaluator]) + elif evaluator == 'clip': + self.model[evaluator] = CLP(self.model_path[evaluator], self.cfg.scorer_device[evaluator]) + elif evaluator == 'blip': + self.model[evaluator] = BLP(self.model_path[evaluator], med_config, self.cfg.scorer_device[evaluator]) + elif evaluator == 'ir': + self.model[evaluator] = IMGR(self.model_path[evaluator], med_config, self.cfg.scorer_device[evaluator]) + elif evaluator == 'laion' or evaluator == 'chad': + self.model[evaluator] = AES(self.model_path[evaluator], self.model_path['clip'], + self.cfg.scorer_device[evaluator]) + elif evaluator == 'hpsv2': + self.model[evaluator] = HPS(self.model_path[evaluator], self.cfg.scorer_device[evaluator]) + elif evaluator == 'pick': + self.model[evaluator] = PICK(self.model_path[evaluator], self.cfg.scorer_device[evaluator]) + elif evaluator == 'shadow': + self.model[evaluator] = SS(self.model_path[evaluator], self.cfg.scorer_device[evaluator]) + elif evaluator == 'cafe': + self.model[evaluator] = CAFE(self.model_path[evaluator], self.cfg.scorer_device[evaluator]) + elif evaluator == 'noai': + self.model[evaluator] = NOAI(self.model_path[evaluator]['class'], self.model_path[evaluator]['real'], self.model_path[evaluator]['anime'], device=self.cfg.scorer_device[evaluator]) + + def score(self, image: Image.Image, prompt) -> float: + values = [] + weights = [] + for evaluator in self.cfg.scorer_method: + weights.append(int(self.cfg.scorer_weight[evaluator])) + if evaluator == 'manual': + # in manual mode, we save a temp image first then request user input + tmp_path = Path(Path.cwd(), "tmp.png") + image.save(tmp_path) + self.open_image(tmp_path) + values.append(self.get_user_score()) + tmp_path.unlink() # remove temporary image + else: + values.append(self.model[evaluator].score(prompt, image)) + + if self.cfg.scorer_print_individual: + print(f"{evaluator}:{values[-1]}") - return score.item() + score = self.average_calc(values, weights, self.cfg.scorer_average_type) + return score def batch_score( - self, - images: List[Image.Image], - payload_names: List[str], - payloads: Dict, - it: int, + self, + images: List[Image.Image], + payload_names: List[str], + payloads: Dict, + it: int, ) -> List[float]: scores = [] norm = [] for i, (img, name, payload) in enumerate(zip(images, payload_names, payloads)): - # in manual mode, we save a temp image first then request user input - if self.cfg.scorer_method == "manual": - tmp_path = Path(Path.cwd(), "tmp.png") - img.save(tmp_path) - self.open_image(tmp_path) - score = AestheticScorer.get_user_score() - tmp_path.unlink() # remove temporary image - else: - score = self.score(img) + score = self.score(img, payload["prompt"]) if self.cfg.save_imgs: self.save_img(img, name, score, it, i, payload) if "score_weight" in payload: - score *= payload["score_weight"] norm.append(payload["score_weight"]) else: norm.append(1.0) @@ -177,10 +308,31 @@ def batch_score( return scores, norm - def average_score(self, scores: List[float], norm: List[float]) -> float: - num = sum(scores) - den = sum(norm) - return 0.0 if den == 0.0 else num / den + def average_calc(self, values: List[float], weights: List[float], average_type: str) -> float: + norm = 0 + for weight in weights: + norm += weight + avg = 0 + if average_type == 'geometric': + avg = 1 + elif average_type == 'arithmetic' or average_type == 'quadratic': + avg = 0 + + for value, weight in zip(values, weights): + if average_type == 'arithmetic': + avg += value * weight + elif average_type == 'geometric': + avg *= value ** weight + elif average_type == 'quadratic': + avg += (value ** 2) * weight + + if average_type == 'arithmetic': + avg = avg / norm + elif average_type == 'geometric': + avg = avg ** (1 / norm) + elif average_type == 'quadratic': + avg = (avg / norm) ** (1 / 2) + return avg def image_path(self, name: str, score: float, it: int, batch_n: int) -> Path: return Path( @@ -189,13 +341,13 @@ def image_path(self, name: str, score: float, it: int, batch_n: int) -> Path: ) def save_img( - self, - image: Image.Image, - name: str, - score: float, - it: int, - batch_n: int, - payload: Dict, + self, + image: Image.Image, + name: str, + score: float, + it: int, + batch_n: int, + payload: Dict, ) -> Path: img_path = self.image_path(name, score, it, batch_n) pnginfo = PngImagePlugin.PngInfo()