如何让AI帮你干活-娱乐(3)
背景
今天的话题会偏代码技巧一些,对于以前没有接触过代码的朋友或者接触代码开发经验较少的朋友会有些吃力。
上篇文章介绍了如何广视角的生成相对稳定的视频。昨天的实现相对简单,主要用的是UI界面来做生成。但是生成的效果其实也显而易见,不算太好。人物连贯性和动作连贯性是不够的。原因如下:
1.stablediffusion webui的batch 的image2image还无法真正做到image2image
2.控制其实是通过固定的文本prompt+多个controlnet来控制
然后如果希望画面能够稳定,其实image的控制信息和每张图用相对有差异的prompt生成的图片质量连贯性和稳定性会更好(image2image控制整体的风格和内容,controlnet控制细节,prompt可以控制一些内容差异)。
这篇文章不会跟大家分享,稳定图生成的具体细节。而是跟大家分享更重要的,如何用代码来实现webui的功能,如何用代码方式搭建更可控更高效的图生成链路。
内容
1.用代码搭建stablediffusion+controlnet生产流程
2.multi-control net生产流程搭建
3.diffuser没有的功能如何自己实现加入
用代码搭建stablediffusion+controlnet生产流程
安装diffuser包
pip install --upgrade diffusers accelerate transformers
#如果要安装最新diffuser可以执行下面指令
pip install git+https://github.com/huggingface/diffusers
! pip install controlnet_hinter==0.0.5
搭建stablediffusion+controlnet脚本
from diffusers import StableDiffusionControlNetPipeline, ControlNetModel
from diffusers.utils import load_image
import torch
import numpy as np
from PIL import Image
import cv2#加载测试图片
original_image = load_image("https://huggingface.co/datasets/diffusers/test-arrays/resolve/main/stable_diffusion_imgvar/input_image_vermeer.png")
original_image#设置controlnet+stablefiddufion流水线
controlnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny",cache_dir='./')
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()#抽取图片canny边界
canny_edged_image = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_canny_edged.png")
canny_edged_image#canny controlnet做图片生成
generator = torch.Generator(device="cpu").manual_seed(3)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=canny_edged_image,num_inference_steps=30, generator=generator).images[0]
image#设置canny编辑过滤阀值
control_image = np.array(original_image)
low_threshold = 10 # default=100
high_threshold = 150 # default=200
control_image = cv2.Canny(control_image, low_threshold, high_threshold)
control_image = control_image[:, :, None]
control_image = np.concatenate([control_image, control_image, control_image], axis=2)
control_image = Image.fromarray(control_image)
control_image#canny controlnet做图片生成
generator = torch.Generator(device="cpu").manual_seed(3)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image,num_inference_steps=30, generator=generator).images[0]
image#pose的controlnet流水线
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-openpose",cache_dir= './')
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()#pose的图片
pose_image = load_image('https://huggingface.co/takuma104/controlnet_dev/resolve/main/pose.png')
pose_image#pose流水线生产图
generator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed, football, a boy", negative_prompt="lowres, bad anatomy, worst quality, low quality",image=pose_image, generator=generator,num_inference_steps=30).images[0]
image#用controlnet_hinter抽取pose控制特征
control_image = controlnet_hinter.hint_openpose(original_image)
control_image#pose流水线生产图
generator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#深度图
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-depth")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()control_image = controlnet_hinter.hint_depth(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#轮廓图
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-scribble")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()scribble_image = load_image('https://github.com/lllyasviel/ControlNet/raw/main/test_imgs/user_1.png')
scribble_imagegenerator = torch.Generator(device="cpu").manual_seed(1)
image = pipe(prompt="a turtle, best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=scribble_image, generator=generator,num_inference_steps=30).images[0]
image#Segmentation
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-seg")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()
control_image = controlnet_hinter.hint_segmentation(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#Hough 建筑、大场景里用的多
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-mlsd")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()control_image = controlnet_hinter.hint_hough(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(2)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image
multi-control net生产流程搭建
diffuser包里面现在还没实现多control net控制图生成,需要使用multi-controlnet可以用以下代码。
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import inspect
from typing import Any, Callable, Dict, List, Optional, Union, Tupleimport numpy as np
import PIL.Image
import torch
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizerfrom diffusers import AutoencoderKL, ControlNetModel, UNet2DConditionModel
from diffusers.schedulers import KarrasDiffusionSchedulers
from diffusers.utils import (PIL_INTERPOLATION,is_accelerate_available,is_accelerate_version,logging,randn_tensor,replace_example_docstring,
)
from diffusers.pipeline_utils import DiffusionPipeline
from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput, StableDiffusionSafetyChecker
from diffusers.models.controlnet import ControlNetOutputlogger = logging.get_logger(__name__) # pylint: disable=invalid-nameclass ControlNetProcessor(object):def __init__(self,controlnet: ControlNetModel,image: Union[torch.FloatTensor, PIL.Image.Image, List[torch.FloatTensor], List[PIL.Image.Image]],conditioning_scale: float = 1.0,):self.controlnet = controlnetself.image = imageself.conditioning_scale = conditioning_scaledef _default_height_width(self, height, width, image):if isinstance(image, list):image = image[0]if height is None:if isinstance(image, PIL.Image.Image):height = image.heightelif isinstance(image, torch.Tensor):height = image.shape[3]height = (height // 8) * 8 # round down to nearest multiple of 8if width is None:if isinstance(image, PIL.Image.Image):width = image.widthelif isinstance(image, torch.Tensor):width = image.shape[2]width = (width // 8) * 8 # round down to nearest multiple of 8return height, widthdef default_height_width(self, height, width):return self._default_height_width(height, width, self.image)def _prepare_image(self, image, width, height, batch_size, num_images_per_prompt, device, dtype):if not isinstance(image, torch.Tensor):if isinstance(image, PIL.Image.Image):image = [image]if isinstance(image[0], PIL.Image.Image):image = [np.array(i.resize((width, height), resample=PIL_INTERPOLATION["lanczos"]))[None, :] for i in image]image = np.concatenate(image, axis=0)image = np.array(image).astype(np.float32) / 255.0image = image.transpose(0, 3, 1, 2)image = torch.from_numpy(image)elif isinstance(image[0], torch.Tensor):image = torch.cat(image, dim=0)image_batch_size = image.shape[0]if image_batch_size == 1:repeat_by = batch_sizeelse:# image batch size is the same as prompt batch sizerepeat_by = num_images_per_promptimage = image.repeat_interleave(repeat_by, dim=0)image = image.to(device=device, dtype=dtype)return imagedef _check_inputs(self, image, prompt, prompt_embeds):image_is_pil = isinstance(image, PIL.Image.Image)image_is_tensor = isinstance(image, torch.Tensor)image_is_pil_list = isinstance(image, list) and isinstance(image[0], PIL.Image.Image)image_is_tensor_list = isinstance(image, list) and isinstance(image[0], torch.Tensor)if not image_is_pil and not image_is_tensor and not image_is_pil_list and not image_is_tensor_list:raise TypeError("image must be passed and be one of PIL image, torch tensor, list of PIL images, or list of torch tensors")if image_is_pil:image_batch_size = 1elif image_is_tensor:image_batch_size = image.shape[0]elif image_is_pil_list:image_batch_size = len(image)elif image_is_tensor_list:image_batch_size = len(image)if prompt is not None and isinstance(prompt, str):prompt_batch_size = 1elif prompt is not None and isinstance(prompt, list):prompt_batch_size = len(prompt)elif prompt_embeds is not None:prompt_batch_size = prompt_embeds.shape[0]if image_batch_size != 1 and image_batch_size != prompt_batch_size:raise ValueError(f"If image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size}")def check_inputs(self, prompt, prompt_embeds):self._check_inputs(self.image, prompt, prompt_embeds)def prepare_image(self, width, height, batch_size, num_images_per_prompt, device, do_classifier_free_guidance):self.image = self._prepare_image(self.image, width, height, batch_size, num_images_per_prompt, device, self.controlnet.dtype)if do_classifier_free_guidance:self.image = torch.cat([self.image] * 2)def __call__(self,sample: torch.FloatTensor,timestep: Union[torch.Tensor, float, int],encoder_hidden_states: torch.Tensor,class_labels: Optional[torch.Tensor] = None,timestep_cond: Optional[torch.Tensor] = None,attention_mask: Optional[torch.Tensor] = None,cross_attention_kwargs: Optional[Dict[str, Any]] = None,return_dict: bool = True,) -> Tuple:down_block_res_samples, mid_block_res_sample = self.controlnet(sample=sample,controlnet_cond=self.image,timestep=timestep,encoder_hidden_states=encoder_hidden_states,class_labels=class_labels,timestep_cond=timestep_cond,attention_mask=attention_mask,cross_attention_kwargs=cross_attention_kwargs,return_dict=False,)down_block_res_samples = [down_block_res_sample * self.conditioning_scale for down_block_res_sample in down_block_res_samples]mid_block_res_sample *= self.conditioning_scalereturn (down_block_res_samples, mid_block_res_sample)EXAMPLE_DOC_STRING = """Examples:```py>>> # !pip install opencv-python transformers accelerate>>> from diffusers import StableDiffusionControlNetPipeline, ControlNetModel, UniPCMultistepScheduler>>> from diffusers.utils import load_image>>> import numpy as np>>> import torch>>> import cv2>>> from PIL import Image>>> # download an image>>> image = load_image(... "https://hf.co/datasets/huggingface/documentation-images/resolve/main/diffusers/input_image_vermeer.png"... )>>> image = np.array(image)>>> # get canny image>>> image = cv2.Canny(image, 100, 200)>>> image = image[:, :, None]>>> image = np.concatenate([image, image, image], axis=2)>>> canny_image = Image.fromarray(image)>>> # load control net and stable diffusion v1-5>>> controlnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny", torch_dtype=torch.float16)>>> pipe = StableDiffusionControlNetPipeline.from_pretrained(... "runwayml/stable-diffusion-v1-5", controlnet=controlnet, torch_dtype=torch.float16... )>>> # speed up diffusion process with faster scheduler and memory optimization>>> pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config)>>> # remove following line if xformers is not installed>>> pipe.enable_xformers_memory_efficient_attention()>>> pipe.enable_model_cpu_offload()>>> # generate image>>> generator = torch.manual_seed(0)>>> image = pipe(... "futuristic-looking woman", num_inference_steps=20, generator=generator, image=canny_image... ).images[0]```
"""class StableDiffusionMultiControlNetPipeline(DiffusionPipeline):r"""Pipeline for text-to-image generation using Stable Diffusion with ControlNet guidance.This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods thelibrary implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)Args:vae ([`AutoencoderKL`]):Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.text_encoder ([`CLIPTextModel`]):Frozen text-encoder. Stable Diffusion uses the text portion of[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specificallythe [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.tokenizer (`CLIPTokenizer`):Tokenizer of class[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.scheduler ([`SchedulerMixin`]):A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].safety_checker ([`StableDiffusionSafetyChecker`]):Classification module that estimates whether generated images could be considered offensive or harmful.Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details.feature_extractor ([`CLIPFeatureExtractor`]):Model that extracts features from generated images to be used as inputs for the `safety_checker`."""_optional_components = ["safety_checker", "feature_extractor"]def __init__(self,vae: AutoencoderKL,text_encoder: CLIPTextModel,tokenizer: CLIPTokenizer,unet: UNet2DConditionModel,scheduler: KarrasDiffusionSchedulers,safety_checker: StableDiffusionSafetyChecker,feature_extractor: CLIPFeatureExtractor,requires_safety_checker: bool = True,):super().__init__()if safety_checker is None and requires_safety_checker:logger.warning(f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"" results in services or applications open to the public. Both the diffusers team and Hugging Face"" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"" it only for use-cases that involve analyzing network behavior or auditing its results. For more"" information, please have a look at https://github.com/huggingface/diffusers/pull/254 .")if safety_checker is not None and feature_extractor is None:raise ValueError("Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead.")self.register_modules(vae=vae,text_encoder=text_encoder,tokenizer=tokenizer,unet=unet,scheduler=scheduler,safety_checker=safety_checker,feature_extractor=feature_extractor,)self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)self.register_to_config(requires_safety_checker=requires_safety_checker)# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.enable_vae_slicingdef enable_vae_slicing(self):r"""Enable sliced VAE decoding.When this option is enabled, the VAE will split the input tensor in slices to compute decoding in severalsteps. This is useful to save some memory and allow larger batch sizes."""self.vae.enable_slicing()# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.disable_vae_slicingdef disable_vae_slicing(self):r"""Disable sliced VAE decoding. If `enable_vae_slicing` was previously invoked, this method will go back tocomputing decoding in one step."""self.vae.disable_slicing()def enable_sequential_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,text_encoder, vae, controlnet, and safety checker have their state dicts saved to CPU and then are moved to a`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called.Note that offloading happens on a submodule basis. Memory savings are higher than with`enable_model_cpu_offload`, but performance is lower."""if is_accelerate_available():from accelerate import cpu_offloadelse:raise ImportError("Please install accelerate via `pip install accelerate`")device = torch.device(f"cuda:{gpu_id}")for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae]:cpu_offload(cpu_offloaded_model, device)if self.safety_checker is not None:cpu_offload(self.safety_checker, execution_device=device, offload_buffers=True)def enable_model_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, reducing memory usage with a low impact on performance. Comparedto `enable_sequential_cpu_offload`, this method moves one whole model at a time to the GPU when its `forward`method is called, and the model remains in GPU until the next model runs. Memory savings are lower than with`enable_sequential_cpu_offload`, but performance is much better due to the iterative execution of the `unet`."""if is_accelerate_available() and is_accelerate_version(">=", "0.17.0.dev0"):from accelerate import cpu_offload_with_hookelse:raise ImportError("`enable_model_offload` requires `accelerate v0.17.0` or higher.")device = torch.device(f"cuda:{gpu_id}")hook = Nonefor cpu_offloaded_model in [self.text_encoder, self.unet, self.vae]:_, hook = cpu_offload_with_hook(cpu_offloaded_model, device, prev_module_hook=hook)if self.safety_checker is not None:# the safety checker can offload the vae again_, hook = cpu_offload_with_hook(self.safety_checker, device, prev_module_hook=hook)# control net hook has be manually offloaded as it alternates with unet# cpu_offload_with_hook(self.controlnet, device)# We'll offload the last model manually.self.final_offload_hook = hook@property# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._execution_devicedef _execution_device(self):r"""Returns the device on which the pipeline's models will be executed. After calling`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's modulehooks."""if not hasattr(self.unet, "_hf_hook"):return self.devicefor module in self.unet.modules():if (hasattr(module, "_hf_hook")and hasattr(module._hf_hook, "execution_device")and module._hf_hook.execution_device is not None):return torch.device(module._hf_hook.execution_device)return self.device# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_promptdef _encode_prompt(self,prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt=None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,):r"""Encodes the prompt into text encoder hidden states.Args:prompt (`str` or `List[str]`, *optional*):prompt to be encodeddevice: (`torch.device`):torch devicenum_images_per_prompt (`int`):number of images that should be generated per promptdo_classifier_free_guidance (`bool`):whether to use classifier free guidance or notnegative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument."""if prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]if prompt_embeds is None:text_inputs = self.tokenizer(prompt,padding="max_length",max_length=self.tokenizer.model_max_length,truncation=True,return_tensors="pt",)text_input_ids = text_inputs.input_idsuntruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_idsif untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):removed_text = self.tokenizer.batch_decode(untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1])logger.warning("The following part of your input was truncated because CLIP can only handle sequences up to"f" {self.tokenizer.model_max_length} tokens: {removed_text}")if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = text_inputs.attention_mask.to(device)else:attention_mask = Noneprompt_embeds = self.text_encoder(text_input_ids.to(device),attention_mask=attention_mask,)prompt_embeds = prompt_embeds[0]prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)bs_embed, seq_len, _ = prompt_embeds.shape# duplicate text embeddings for each generation per prompt, using mps friendly methodprompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)prompt_embeds = prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1)# get unconditional embeddings for classifier free guidanceif do_classifier_free_guidance and negative_prompt_embeds is None:uncond_tokens: List[str]if negative_prompt is None:uncond_tokens = [""] * batch_sizeelif type(prompt) is not type(negative_prompt):raise TypeError(f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="f" {type(prompt)}.")elif isinstance(negative_prompt, str):uncond_tokens = [negative_prompt]elif batch_size != len(negative_prompt):raise ValueError(f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"" the batch size of `prompt`.")else:uncond_tokens = negative_promptmax_length = prompt_embeds.shape[1]uncond_input = self.tokenizer(uncond_tokens,padding="max_length",max_length=max_length,truncation=True,return_tensors="pt",)if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = uncond_input.attention_mask.to(device)else:attention_mask = Nonenegative_prompt_embeds = self.text_encoder(uncond_input.input_ids.to(device),attention_mask=attention_mask,)negative_prompt_embeds = negative_prompt_embeds[0]if do_classifier_free_guidance:# duplicate unconditional embeddings for each generation per prompt, using mps friendly methodseq_len = negative_prompt_embeds.shape[1]negative_prompt_embeds = negative_prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)# For classifier free guidance, we need to do two forward passes.# Here we concatenate the unconditional and text embeddings into a single batch# to avoid doing two forward passesprompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])return prompt_embeds# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checkerdef run_safety_checker(self, image, device, dtype):if self.safety_checker is not None:safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(device)image, has_nsfw_concept = self.safety_checker(images=image, clip_input=safety_checker_input.pixel_values.to(dtype))else:has_nsfw_concept = Nonereturn image, has_nsfw_concept# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latentsdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return image# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargsdef prepare_extra_step_kwargs(self, generator, eta):# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502# and should be between [0, 1]accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())extra_step_kwargs = {}if accepts_eta:extra_step_kwargs["eta"] = eta# check if the scheduler accepts generatoraccepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())if accepts_generator:extra_step_kwargs["generator"] = generatorreturn extra_step_kwargsdef check_inputs(self,prompt,height,width,callback_steps,negative_prompt=None,prompt_embeds=None,negative_prompt_embeds=None,):if height % 8 != 0 or width % 8 != 0:raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")if (callback_steps is None) or (callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)):raise ValueError(f"`callback_steps` has to be a positive integer but is {callback_steps} of type"f" {type(callback_steps)}.")if prompt is not None and prompt_embeds is not None:raise ValueError(f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"" only forward one of the two.")elif prompt is None and prompt_embeds is None:raise ValueError("Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined.")elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")if negative_prompt is not None and negative_prompt_embeds is not None:raise ValueError(f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"f" {negative_prompt_embeds}. Please make sure to only forward one of the two.")if prompt_embeds is not None and negative_prompt_embeds is not None:if prompt_embeds.shape != negative_prompt_embeds.shape:raise ValueError("`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"f" {negative_prompt_embeds.shape}.")# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latentsdef prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):shape = (batch_size, num_channels_latents, height // self.vae_scale_factor, width // self.vae_scale_factor)if isinstance(generator, list) and len(generator) != batch_size:raise ValueError(f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"f" size of {batch_size}. Make sure the batch size matches the length of the generators.")if latents is None:latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)else:latents = latents.to(device)# scale the initial noise by the standard deviation required by the schedulerlatents = latents * self.scheduler.init_noise_sigmareturn latents@torch.no_grad()@replace_example_docstring(EXAMPLE_DOC_STRING)def __call__(self,processors: List[ControlNetProcessor],prompt: Union[str, List[str]] = None,height: Optional[int] = None,width: Optional[int] = None,num_inference_steps: int = 50,guidance_scale: float = 7.5,negative_prompt: Optional[Union[str, List[str]]] = None,num_images_per_prompt: Optional[int] = 1,eta: float = 0.0,generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,latents: Optional[torch.FloatTensor] = None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,output_type: Optional[str] = "pil",return_dict: bool = True,callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None,callback_steps: int = 1,cross_attention_kwargs: Optional[Dict[str, Any]] = None,):r"""Function invoked when calling the pipeline for generation.Args:prompt (`str` or `List[str]`, *optional*):The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`.instead.height (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The height in pixels of the generated image.width (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The width in pixels of the generated image.num_inference_steps (`int`, *optional*, defaults to 50):The number of denoising steps. More denoising steps usually lead to a higher quality image at theexpense of slower inference.guidance_scale (`float`, *optional*, defaults to 7.5):Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).`guidance_scale` is defined as `w` of equation 2. of [ImagenPaper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,usually at the expense of lower image quality.negative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).num_images_per_prompt (`int`, *optional*, defaults to 1):The number of images to generate per prompt.eta (`float`, *optional*, defaults to 0.0):Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to[`schedulers.DDIMScheduler`], will be ignored for others.generator (`torch.Generator` or `List[torch.Generator]`, *optional*):One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)to make generation deterministic.latents (`torch.FloatTensor`, *optional*):Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for imagegeneration. Can be used to tweak the same generation with different prompts. If not provided, a latentstensor will ge generated by sampling using the supplied random `generator`.prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument.output_type (`str`, *optional*, defaults to `"pil"`):The output format of the generate image. Choose between[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`.return_dict (`bool`, *optional*, defaults to `True`):Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of aplain tuple.callback (`Callable`, *optional*):A function that will be called every `callback_steps` steps during inference. The function will becalled with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`.callback_steps (`int`, *optional*, defaults to 1):The frequency at which the `callback` function will be called. If not specified, the callback will becalled at every step.cross_attention_kwargs (`dict`, *optional*):A kwargs dictionary that if specified is passed along to the `AttnProcessor` as defined under`self.processor` in[diffusers.cross_attention](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/cross_attention.py).Examples:Returns:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple.When returning a tuple, the first element is a list with the generated images, and the second element is alist of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work"(nsfw) content, according to the `safety_checker`."""# 0. Default height and width to unetheight, width = processors[0].default_height_width(height, width)# 1. Check inputs. Raise error if not correctself.check_inputs(prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds)for processor in processors:processor.check_inputs(prompt, prompt_embeds)# 2. Define call parametersif prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]device = self._execution_device# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`# corresponds to doing no classifier free guidance.do_classifier_free_guidance = guidance_scale > 1.0# 3. Encode input promptprompt_embeds = self._encode_prompt(prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt,prompt_embeds=prompt_embeds,negative_prompt_embeds=negative_prompt_embeds,)# 4. Prepare imagefor processor in processors:processor.prepare_image(width=width,height=height,batch_size=batch_size * num_images_per_prompt,num_images_per_prompt=num_images_per_prompt,device=device,do_classifier_free_guidance=do_classifier_free_guidance,)# 5. Prepare timestepsself.scheduler.set_timesteps(num_inference_steps, device=device)timesteps = self.scheduler.timesteps# 6. Prepare latent variablesnum_channels_latents = self.unet.in_channelslatents = self.prepare_latents(batch_size * num_images_per_prompt,num_channels_latents,height,width,prompt_embeds.dtype,device,generator,latents,)# 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipelineextra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)# 8. Denoising loopnum_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.orderwith self.progress_bar(total=num_inference_steps) as progress_bar:for i, t in enumerate(timesteps):# expand the latents if we are doing classifier free guidancelatent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latentslatent_model_input = self.scheduler.scale_model_input(latent_model_input, t)# controlnet inferencefor i, processor in enumerate(processors):down_samples, mid_sample = processor(latent_model_input,t,encoder_hidden_states=prompt_embeds,return_dict=False,)if i == 0:down_block_res_samples, mid_block_res_sample = down_samples, mid_sampleelse:down_block_res_samples = [d_prev + d_curr for d_prev, d_curr in zip(down_block_res_samples, down_samples)]mid_block_res_sample = mid_block_res_sample + mid_sample# predict the noise residualnoise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,down_block_additional_residuals=down_block_res_samples,mid_block_additional_residual=mid_block_res_sample,).sample# perform guidanceif do_classifier_free_guidance:noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)# compute the previous noisy sample x_t -> x_t-1latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample# call the callback, if providedif i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):progress_bar.update()if callback is not None and i % callback_steps == 0:callback(i, t, latents)# If we do sequential model offloading, let's offload unet and controlnet# manually for max memory savingsif hasattr(self, "final_offload_hook") and self.final_offload_hook is not None:self.unet.to("cpu")torch.cuda.empty_cache()if output_type == "latent":image = latentshas_nsfw_concept = Noneelif output_type == "pil":# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# 10. Convert to PILimage = self.numpy_to_pil(image)else:# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# Offload last model to CPUif hasattr(self, "final_offload_hook") and self.final_offload_hook is not None:self.final_offload_hook.offload()if not return_dict:return (image, has_nsfw_concept)return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)# demo & simple test
def main():from diffusers.utils import load_imagepipe = StableDiffusionMultiControlNetPipeline.from_pretrained("./model", safety_checker=None, torch_dtype=torch.float16).to("cuda")pipe.enable_xformers_memory_efficient_attention()controlnet_canny = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny",cache_dir='./controlmodel', torch_dtype=torch.float16).to("cuda")controlnet_pose = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-openpose",cache_dir='./controlmodel', torch_dtype=torch.float16).to("cuda")canny_left = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_left.png")canny_right = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_right.png")pose_right = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/pose_right.png")image = pipe(prompt="best quality, extremely detailed",negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",processors=[ControlNetProcessor(controlnet_canny, canny_left),ControlNetProcessor(controlnet_canny, canny_right),],generator=torch.Generator(device="cpu").manual_seed(0),num_inference_steps=30,width=512,height=512,).images[0]image.save("./canny_left_right.png")image = pipe(prompt="best quality, extremely detailed",negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",processors=[ControlNetProcessor(controlnet_canny, canny_left,0.5),ControlNetProcessor(controlnet_pose, pose_right,1.6),],generator=torch.Generator(device="cpu").manual_seed(0),num_inference_steps=30,width=512,height=512,).images[0]image.save("./canny_left_pose_right.png")if __name__ == "__main__":main()
diffuser没有的功能如何自己实现加入
假设我们要自己实现一个stablediffusion+controlnet+inpaint的功能,该如何实现。这个任务大部分是在生产流程上做串接,所以代码基本可以定位在pipline模块stablediffusion。
假设我们代码实现如下(代码在特定版本有效,这个后面会升级,大家可以不急着用。只是跟大家讲解diffuser的代码框架,以及改动一个模块该如何融入diffuser中供自己使用)
#代码文件名:pipeline_stable_diffusion_controlnet_inpaint.py
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import inspect
from typing import Any, Callable, Dict, List, Optional, Unionimport numpy as np
import PIL.Image
import torch
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizerfrom ...models import AutoencoderKL, UNet2DConditionModel
from ...schedulers import KarrasDiffusionSchedulers
from ...utils import is_accelerate_available, logging, randn_tensor, replace_example_docstring
from ..pipeline_utils import DiffusionPipeline
from . import StableDiffusionPipelineOutput
from .safety_checker import StableDiffusionSafetyCheckerlogger = logging.get_logger(__name__) # pylint: disable=invalid-nameEXAMPLE_DOC_STRING = """Examples:```py>>> from diffusers import StableDiffusionControlNetPipeline>>> from diffusers.utils import load_image>>> # Canny edged image for control>>> canny_edged_image = load_image(... "https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_canny_edged.png"... )>>> pipe = StableDiffusionControlNetPipeline.from_pretrained("takuma104/control_sd15_canny").to("cuda")>>> image = pipe(prompt="best quality, extremely detailed", controlnet_hint=canny_edged_image).images[0]```
"""def prepare_mask_and_masked_image(image, mask):"""Prepares a pair (image, mask) to be consumed by the Stable Diffusion pipeline. This means that those inputs will beconverted to ``torch.Tensor`` with shapes ``batch x channels x height x width`` where ``channels`` is ``3`` for the``image`` and ``1`` for the ``mask``.The ``image`` will be converted to ``torch.float32`` and normalized to be in ``[-1, 1]``. The ``mask`` will bebinarized (``mask > 0.5``) and cast to ``torch.float32`` too.Args:image (Union[np.array, PIL.Image, torch.Tensor]): The image to inpaint.It can be a ``PIL.Image``, or a ``height x width x 3`` ``np.array`` or a ``channels x height x width````torch.Tensor`` or a ``batch x channels x height x width`` ``torch.Tensor``.mask (_type_): The mask to apply to the image, i.e. regions to inpaint.It can be a ``PIL.Image``, or a ``height x width`` ``np.array`` or a ``1 x height x width````torch.Tensor`` or a ``batch x 1 x height x width`` ``torch.Tensor``.Raises:ValueError: ``torch.Tensor`` images should be in the ``[-1, 1]`` range. ValueError: ``torch.Tensor`` maskshould be in the ``[0, 1]`` range. ValueError: ``mask`` and ``image`` should have the same spatial dimensions.TypeError: ``mask`` is a ``torch.Tensor`` but ``image`` is not(ot the other way around).Returns:tuple[torch.Tensor]: The pair (mask, masked_image) as ``torch.Tensor`` with 4dimensions: ``batch x channels x height x width``."""if isinstance(image, torch.Tensor):if not isinstance(mask, torch.Tensor):raise TypeError(f"`image` is a torch.Tensor but `mask` (type: {type(mask)} is not")# Batch single imageif image.ndim == 3:assert image.shape[0] == 3, "Image outside a batch should be of shape (3, H, W)"image = image.unsqueeze(0)# Batch and add channel dim for single maskif mask.ndim == 2:mask = mask.unsqueeze(0).unsqueeze(0)# Batch single mask or add channel dimif mask.ndim == 3:# Single batched mask, no channel dim or single mask not batched but channel dimif mask.shape[0] == 1:mask = mask.unsqueeze(0)# Batched masks no channel dimelse:mask = mask.unsqueeze(1)assert image.ndim == 4 and mask.ndim == 4, "Image and Mask must have 4 dimensions"assert image.shape[-2:] == mask.shape[-2:], "Image and Mask must have the same spatial dimensions"assert image.shape[0] == mask.shape[0], "Image and Mask must have the same batch size"# Check image is in [-1, 1]if image.min() < -1 or image.max() > 1:raise ValueError("Image should be in [-1, 1] range")# Check mask is in [0, 1]if mask.min() < 0 or mask.max() > 1:raise ValueError("Mask should be in [0, 1] range")# Binarize maskmask[mask < 0.5] = 0mask[mask >= 0.5] = 1# Image as float32image = image.to(dtype=torch.float32)elif isinstance(mask, torch.Tensor):raise TypeError(f"`mask` is a torch.Tensor but `image` (type: {type(image)} is not")else:# preprocess imageif isinstance(image, (PIL.Image.Image, np.ndarray)):image = [image]if isinstance(image, list) and isinstance(image[0], PIL.Image.Image):image = [np.array(i.convert("RGB"))[None, :] for i in image]image = np.concatenate(image, axis=0)elif isinstance(image, list) and isinstance(image[0], np.ndarray):image = np.concatenate([i[None, :] for i in image], axis=0)image = image.transpose(0, 3, 1, 2)image = torch.from_numpy(image).to(dtype=torch.float32) / 127.5 - 1.0# preprocess maskif isinstance(mask, (PIL.Image.Image, np.ndarray)):mask = [mask]if isinstance(mask, list) and isinstance(mask[0], PIL.Image.Image):mask = np.concatenate([np.array(m.convert("L"))[None, None, :] for m in mask], axis=0)mask = mask.astype(np.float32) / 255.0elif isinstance(mask, list) and isinstance(mask[0], np.ndarray):mask = np.concatenate([m[None, None, :] for m in mask], axis=0)mask[mask < 0.5] = 0mask[mask >= 0.5] = 1mask = torch.from_numpy(mask)masked_image = image * (mask < 0.5)return mask, masked_imageclass StableDiffusionControlNetInpaintPipeline(DiffusionPipeline):r"""Pipeline for text-to-image generation using Stable Diffusion with ControlNet guidance.This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods thelibrary implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)Args:vae ([`AutoencoderKL`]):Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.text_encoder ([`CLIPTextModel`]):Frozen text-encoder. Stable Diffusion uses the text portion of[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specificallythe [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.tokenizer (`CLIPTokenizer`):Tokenizer of class[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.controlnet ([`UNet2DConditionModel`]):[ControlNet](https://arxiv.org/abs/2302.05543) architecture to generate guidance.scheduler ([`SchedulerMixin`]):A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].safety_checker ([`StableDiffusionSafetyChecker`]):Classification module that estimates whether generated images could be considered offensive or harmful.Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details.feature_extractor ([`CLIPFeatureExtractor`]):Model that extracts features from generated images to be used as inputs for the `safety_checker`."""def __init__(self,vae: AutoencoderKL,text_encoder: CLIPTextModel,tokenizer: CLIPTokenizer,unet: UNet2DConditionModel,controlnet: UNet2DConditionModel,scheduler: KarrasDiffusionSchedulers,safety_checker: StableDiffusionSafetyChecker,feature_extractor: CLIPFeatureExtractor,requires_safety_checker: bool = True,):super().__init__()self.register_modules(vae=vae,text_encoder=text_encoder,tokenizer=tokenizer,unet=unet,controlnet=controlnet,scheduler=scheduler,safety_checker=safety_checker,feature_extractor=feature_extractor,)self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)self.register_to_config(requires_safety_checker=requires_safety_checker)def enable_vae_slicing(self):r"""Enable sliced VAE decoding.When this option is enabled, the VAE will split the input tensor in slices to compute decoding in severalsteps. This is useful to save some memory and allow larger batch sizes."""self.vae.enable_slicing()def disable_vae_slicing(self):r"""Disable sliced VAE decoding. If `enable_vae_slicing` was previously invoked, this method will go back tocomputing decoding in one step."""self.vae.disable_slicing()def enable_sequential_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,text_encoder, vae and safety checker have their state dicts saved to CPU and then are moved to a`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called."""if is_accelerate_available():from accelerate import cpu_offloadelse:raise ImportError("Please install accelerate via `pip install accelerate`")device = torch.device(f"cuda:{gpu_id}")for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae]:cpu_offload(cpu_offloaded_model, device)if self.safety_checker is not None:cpu_offload(self.safety_checker, execution_device=device, offload_buffers=True)@propertydef _execution_device(self):r"""Returns the device on which the pipeline's models will be executed. After calling`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's modulehooks."""if self.device != torch.device("meta") or not hasattr(self.unet, "_hf_hook"):return self.devicefor module in self.unet.modules():if (hasattr(module, "_hf_hook")and hasattr(module._hf_hook, "execution_device")and module._hf_hook.execution_device is not None):return torch.device(module._hf_hook.execution_device)return self.devicedef _encode_prompt(self,prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt=None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,):r"""Encodes the prompt into text encoder hidden states.Args:prompt (`str` or `List[str]`, *optional*):prompt to be encodeddevice: (`torch.device`):torch devicenum_images_per_prompt (`int`):number of images that should be generated per promptdo_classifier_free_guidance (`bool`):whether to use classifier free guidance or notnegative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument."""if prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]if prompt_embeds is None:text_inputs = self.tokenizer(prompt,padding="max_length",max_length=self.tokenizer.model_max_length,truncation=True,return_tensors="pt",)text_input_ids = text_inputs.input_idsuntruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_idsif untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):removed_text = self.tokenizer.batch_decode(untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1])logger.warning("The following part of your input was truncated because CLIP can only handle sequences up to"f" {self.tokenizer.model_max_length} tokens: {removed_text}")if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = text_inputs.attention_mask.to(device)else:attention_mask = Noneprompt_embeds = self.text_encoder(text_input_ids.to(device),attention_mask=attention_mask,)prompt_embeds = prompt_embeds[0]prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)bs_embed, seq_len, _ = prompt_embeds.shape# duplicate text embeddings for each generation per prompt, using mps friendly methodprompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)prompt_embeds = prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1)# get unconditional embeddings for classifier free guidanceif do_classifier_free_guidance and negative_prompt_embeds is None:uncond_tokens: List[str]if negative_prompt is None:uncond_tokens = [""] * batch_sizeelif type(prompt) is not type(negative_prompt):raise TypeError(f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="f" {type(prompt)}.")elif isinstance(negative_prompt, str):uncond_tokens = [negative_prompt]elif batch_size != len(negative_prompt):raise ValueError(f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"" the batch size of `prompt`.")else:uncond_tokens = negative_promptmax_length = prompt_embeds.shape[1]uncond_input = self.tokenizer(uncond_tokens,padding="max_length",max_length=max_length,truncation=True,return_tensors="pt",)if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = uncond_input.attention_mask.to(device)else:attention_mask = Nonenegative_prompt_embeds = self.text_encoder(uncond_input.input_ids.to(device),attention_mask=attention_mask,)negative_prompt_embeds = negative_prompt_embeds[0]if do_classifier_free_guidance:# duplicate unconditional embeddings for each generation per prompt, using mps friendly methodseq_len = negative_prompt_embeds.shape[1]negative_prompt_embeds = negative_prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)# For classifier free guidance, we need to do two forward passes.# Here we concatenate the unconditional and text embeddings into a single batch# to avoid doing two forward passesprompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])return prompt_embedsdef run_safety_checker(self, image, device, dtype):if self.safety_checker is not None:safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(device)image, has_nsfw_concept = self.safety_checker(images=image, clip_input=safety_checker_input.pixel_values.to(dtype))else:has_nsfw_concept = Nonereturn image, has_nsfw_conceptdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return imagedef prepare_extra_step_kwargs(self, generator, eta):# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502# and should be between [0, 1]accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())extra_step_kwargs = {}if accepts_eta:extra_step_kwargs["eta"] = eta# check if the scheduler accepts generatoraccepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())if accepts_generator:extra_step_kwargs["generator"] = generatorreturn extra_step_kwargsdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return imagedef check_inputs(self,prompt,height,width,callback_steps,negative_prompt=None,prompt_embeds=None,negative_prompt_embeds=None,):if height % 8 != 0 or width % 8 != 0:raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")if (callback_steps is None) or (callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)):raise ValueError(f"`callback_steps` has to be a positive integer but is {callback_steps} of type"f" {type(callback_steps)}.")if prompt is not None and prompt_embeds is not None:raise ValueError(f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"" only forward one of the two.")elif prompt is None and prompt_embeds is None:raise ValueError("Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined.")elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")if negative_prompt is not None and negative_prompt_embeds is not None:raise ValueError(f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"f" {negative_prompt_embeds}. Please make sure to only forward one of the two.")if prompt_embeds is not None and negative_prompt_embeds is not None:if prompt_embeds.shape != negative_prompt_embeds.shape:raise ValueError("`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"f" {negative_prompt_embeds.shape}.")def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):shape = (batch_size, num_channels_latents, height // self.vae_scale_factor, width // self.vae_scale_factor)if isinstance(generator, list) and len(generator) != batch_size:raise ValueError(f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"f" size of {batch_size}. Make sure the batch size matches the length of the generators.")if latents is None:latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)else:latents = latents.to(device)# scale the initial noise by the standard deviation required by the schedulerlatents = latents * self.scheduler.init_noise_sigmareturn latentsdef controlnet_hint_conversion(self, controlnet_hint, height, width, num_images_per_prompt):channels = 3if isinstance(controlnet_hint, torch.Tensor):# torch.Tensor: acceptble shape are any of chw, bchw(b==1) or bchw(b==num_images_per_prompt)shape_chw = (channels, height, width)shape_bchw = (1, channels, height, width)shape_nchw = (num_images_per_prompt, channels, height, width)if controlnet_hint.shape in [shape_chw, shape_bchw, shape_nchw]:controlnet_hint = controlnet_hint.to(dtype=self.controlnet.dtype, device=self.controlnet.device)if controlnet_hint.shape != shape_nchw:controlnet_hint = controlnet_hint.repeat(num_images_per_prompt, 1, 1, 1)return controlnet_hintelse:raise ValueError(f"Acceptble shape of `controlnet_hint` are any of ({channels}, {height}, {width}),"+ f" (1, {channels}, {height}, {width}) or ({num_images_per_prompt}, "+ f"{channels}, {height}, {width}) but is {controlnet_hint.shape}")elif isinstance(controlnet_hint, np.ndarray):# np.ndarray: acceptable shape is any of hw, hwc, bhwc(b==1) or bhwc(b==num_images_per_promot)# hwc is opencv compatible image format. Color channel must be BGR Format.if controlnet_hint.shape == (height, width):controlnet_hint = np.repeat(controlnet_hint[:, :, np.newaxis], channels, axis=2) # hw -> hwc(c==3)shape_hwc = (height, width, channels)shape_bhwc = (1, height, width, channels)shape_nhwc = (num_images_per_prompt, height, width, channels)if controlnet_hint.shape in [shape_hwc, shape_bhwc, shape_nhwc]:controlnet_hint = torch.from_numpy(controlnet_hint.copy())controlnet_hint = controlnet_hint.to(dtype=self.controlnet.dtype, device=self.controlnet.device)controlnet_hint /= 255.0if controlnet_hint.shape != shape_nhwc:controlnet_hint = controlnet_hint.repeat(num_images_per_prompt, 1, 1, 1)controlnet_hint = controlnet_hint.permute(0, 3, 1, 2) # b h w c -> b c h wreturn controlnet_hintelse:raise ValueError(f"Acceptble shape of `controlnet_hint` are any of ({width}, {channels}), "+ f"({height}, {width}, {channels}), "+ f"(1, {height}, {width}, {channels}) or "+ f"({num_images_per_prompt}, {channels}, {height}, {width}) but is {controlnet_hint.shape}")elif isinstance(controlnet_hint, PIL.Image.Image):if controlnet_hint.size == (width, height):controlnet_hint = controlnet_hint.convert("RGB") # make sure 3 channel RGB formatcontrolnet_hint = np.array(controlnet_hint) # to numpycontrolnet_hint = controlnet_hint[:, :, ::-1] # RGB -> BGRreturn self.controlnet_hint_conversion(controlnet_hint, height, width, num_images_per_prompt)else:raise ValueError(f"Acceptable image size of `controlnet_hint` is ({width}, {height}) but is {controlnet_hint.size}")else:raise ValueError(f"Acceptable type of `controlnet_hint` are any of torch.Tensor, np.ndarray, PIL.Image.Image but is {type(controlnet_hint)}")def prepare_mask_latents(self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance):# resize the mask to latents shape as we concatenate the mask to the latents# we do that before converting to dtype to avoid breaking in case we're using cpu_offload# and half precisionmask = torch.nn.functional.interpolate(mask, size=(height // self.vae_scale_factor, width // self.vae_scale_factor))mask = mask.to(device=device, dtype=dtype)masked_image = masked_image.to(device=device, dtype=dtype)# encode the mask image into latents space so we can concatenate it to the latentsif isinstance(generator, list):masked_image_latents = [self.vae.encode(masked_image[i : i + 1]).latent_dist.sample(generator=generator[i])for i in range(batch_size)]masked_image_latents = torch.cat(masked_image_latents, dim=0)else:masked_image_latents = self.vae.encode(masked_image).latent_dist.sample(generator=generator)masked_image_latents = self.vae.config.scaling_factor * masked_image_latents# duplicate mask and masked_image_latents for each generation per prompt, using mps friendly methodif mask.shape[0] < batch_size:if not batch_size % mask.shape[0] == 0:raise ValueError("The passed mask and the required batch size don't match. Masks are supposed to be duplicated to"f" a total batch size of {batch_size}, but {mask.shape[0]} masks were passed. Make sure the number"" of masks that you pass is divisible by the total requested batch size.")mask = mask.repeat(batch_size // mask.shape[0], 1, 1, 1)if masked_image_latents.shape[0] < batch_size:if not batch_size % masked_image_latents.shape[0] == 0:raise ValueError("The passed images and the required batch size don't match. Images are supposed to be duplicated"f" to a total batch size of {batch_size}, but {masked_image_latents.shape[0]} images were passed."" Make sure the number of images that you pass is divisible by the total requested batch size.")masked_image_latents = masked_image_latents.repeat(batch_size // masked_image_latents.shape[0], 1, 1, 1)mask = torch.cat([mask] * 2) if do_classifier_free_guidance else maskmasked_image_latents = (torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents)# aligning device to prevent device errors when concating it with the latent model inputmasked_image_latents = masked_image_latents.to(device=device, dtype=dtype)return mask, masked_image_latents@torch.no_grad()@replace_example_docstring(EXAMPLE_DOC_STRING)def __call__(self,prompt: Union[str, List[str]] = None,height: Optional[int] = None,width: Optional[int] = None,num_inference_steps: int = 50,guidance_scale: float = 7.5,negative_prompt: Optional[Union[str, List[str]]] = None,num_images_per_prompt: Optional[int] = 1,eta: float = 0.0,generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,latents: Optional[torch.FloatTensor] = None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,output_type: Optional[str] = "pil",return_dict: bool = True,callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None,callback_steps: Optional[int] = 1,cross_attention_kwargs: Optional[Dict[str, Any]] = None,controlnet_hint: Optional[Union[torch.FloatTensor, np.ndarray, PIL.Image.Image]] = None,image: Union[torch.FloatTensor, PIL.Image.Image] = None,mask_image: Union[torch.FloatTensor, PIL.Image.Image] = None,):r"""Function invoked when calling the pipeline for generation.Args:prompt (`str` or `List[str]`, *optional*):The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`.instead.height (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The height in pixels of the generated image.width (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The width in pixels of the generated image.num_inference_steps (`int`, *optional*, defaults to 50):The number of denoising steps. More denoising steps usually lead to a higher quality image at theexpense of slower inference.guidance_scale (`float`, *optional*, defaults to 7.5):Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).`guidance_scale` is defined as `w` of equation 2. of [ImagenPaper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,usually at the expense of lower image quality.negative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).num_images_per_prompt (`int`, *optional*, defaults to 1):The number of images to generate per prompt.eta (`float`, *optional*, defaults to 0.0):Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to[`schedulers.DDIMScheduler`], will be ignored for others.generator (`torch.Generator` or `List[torch.Generator]`, *optional*):One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)to make generation deterministic.latents (`torch.FloatTensor`, *optional*):Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for imagegeneration. Can be used to tweak the same generation with different prompts. If not provided, a latentstensor will ge generated by sampling using the supplied random `generator`.prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument.output_type (`str`, *optional*, defaults to `"pil"`):The output format of the generate image. Choose between[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`.return_dict (`bool`, *optional*, defaults to `True`):Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of aplain tuple.callback (`Callable`, *optional*):A function that will be called every `callback_steps` steps during inference. The function will becalled with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`.callback_steps (`int`, *optional*, defaults to 1):The frequency at which the `callback` function will be called. If not specified, the callback will becalled at every step.cross_attention_kwargs (`dict`, *optional*):A kwargs dictionary that if specified is passed along to the `AttnProcessor` as defined under`self.processor` in[diffusers.cross_attention](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/cross_attention.py).controlnet_hint (`torch.FloatTensor`, `np.ndarray` or `PIL.Image.Image`, *optional*):ControlNet input embedding. ControlNet generates guidances using this input embedding. If the type isspecified as `torch.FloatTensor`, it is passed to ControlNet as is. If the type is `np.ndarray`, it isassumed to be an OpenCV compatible image format. PIL.Image.Image` can also be accepted as an image. Thesize of all these types must correspond to the output image size.Examples:Returns:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple.When returning a tuple, the first element is a list with the generated images, and the second element is alist of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work"(nsfw) content, according to the `safety_checker`."""# 0. Default height and width to unetheight = height or self.unet.config.sample_size * self.vae_scale_factorwidth = width or self.unet.config.sample_size * self.vae_scale_factor# 1. Control Embedding check & conversionif controlnet_hint is not None:controlnet_hint = self.controlnet_hint_conversion(controlnet_hint, height, width, num_images_per_prompt)# 2. Check inputs. Raise error if not correctself.check_inputs(prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds)# 3. Define call parametersif prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]device = self._execution_device# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`# corresponds to doing no classifier free guidance.do_classifier_free_guidance = guidance_scale > 1.0# 4. Encode input promptprompt_embeds = self._encode_prompt(prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt,prompt_embeds=prompt_embeds,negative_prompt_embeds=negative_prompt_embeds,)mask, masked_image = prepare_mask_and_masked_image(image, mask_image)# 5. Prepare timestepsself.scheduler.set_timesteps(num_inference_steps, device=device)timesteps = self.scheduler.timesteps# 6. Prepare latent variablesnum_channels_latents = self.unet.in_channelslatents = self.prepare_latents(batch_size * num_images_per_prompt,num_channels_latents,height,width,prompt_embeds.dtype,device,generator,latents,)mask, masked_image_latents = self.prepare_mask_latents(mask,masked_image,batch_size * num_images_per_prompt,height,width,prompt_embeds.dtype,device,generator,do_classifier_free_guidance,)num_channels_mask = mask.shape[1]num_channels_masked_image = masked_image_latents.shape[1]# 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipelineextra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)# 8. Denoising loopnum_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.orderwith self.progress_bar(total=num_inference_steps) as progress_bar:for i, t in enumerate(timesteps):# expand the latents if we are doing classifier free guidancelatent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latentslatent_model_input = self.scheduler.scale_model_input(latent_model_input, t)if controlnet_hint is not None:# ControlNet predict the noise residualcontrol = self.controlnet(latent_model_input, t, encoder_hidden_states=prompt_embeds, controlnet_hint=controlnet_hint)control = [item for item in control]latent_model_input = torch.cat([latent_model_input, mask, masked_image_latents], dim=1)noise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,control=control,).sampleelse:# predict the noise residuallatent_model_input = torch.cat([latent_model_input, mask, masked_image_latents], dim=1)noise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,).sample# perform guidanceif do_classifier_free_guidance:noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)# compute the previous noisy sample x_t -> x_t-1latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample# call the callback, if providedif i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):progress_bar.update()if callback is not None and i % callback_steps == 0:callback(i, t, latents)if output_type == "latent":image = latentshas_nsfw_concept = Noneelif output_type == "pil":# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# 10. Convert to PILimage = self.numpy_to_pil(image)else:# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)if not return_dict:return (image, has_nsfw_concept)return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)
把文件放置在:
以下项目下的每个__init__.py需要把新加的类添加进去
添加信息可以参考文件里面的写法,一般如下:
相关文章:
如何让AI帮你干活-娱乐(3)
背景今天的话题会偏代码技巧一些,对于以前没有接触过代码的朋友或者接触代码开发经验较少的朋友会有些吃力。上篇文章介绍了如何广视角的生成相对稳定的视频。昨天的实现相对简单,主要用的是UI界面来做生成。但是生成的效果其实也显而易见,不…...
webview的工作、内存泄漏、漏洞以及缓存机制原理原理+方案解决
分析一段appium的日志来分析webview的工作原理,文章尾部附有自动化脚本及完整日志: 解析: 获取上下文列表 服务端发送命令adb shell cat /proc/net/unix获取域套接字列表。那什么是域套接字呢? 域套接字:是unix系统里…...
BFD协议原理
BFD协议原理引入背景不使用BFD带来的问题OSPF感知慢VRRP产生次优路径BFD技术简介BFD会话建立方式和检测机制BFD会话建立过程BFD工作流程BFD的单臂回声BFD默认参数以及调整方法总结引入背景 随着网络应用的广泛部署,网络发生中断可能影响业务正常运行并造成重大损失…...
你把骑行当什么? 它就是你需要的
1.骑行是一种有活力的运动,尝试一下你一定会喜欢上它的!2.把骑行当作一种娱乐,让自己快乐地体验自然的美!3.骑行可以帮助你改变心态,让你的心情变得更加愉悦!4.让骑行成为你每天的计划,看看骑行…...
python基础系列 —— 迭代器与内置高阶函数
目录 一、迭代器 1、基本概念 2、如何定义一个迭代器 3、如果判断对象是否是迭代器 4、如何重置迭代器 5、如何调用迭代器 二、高阶函数 1、map函数 2、filter函数 3、reduce函数 4、sorted函数 一、迭代器 1、基本概念 迭代:是一个重复的过程,每次重复…...
MySQL面试题-日志
目录 1.MySQL 中常见的日志有哪些? 2.慢查询日志有什么用? 3.binlog 主要记录了什么? 4.Mysql的binlog有几种录入格式?分别有什么区别? 5.redo log 如何保证事务的持久性? 6.页修改之后为什么不直接刷…...
Android 10.0 去掉Launcher3默认给 icon增加的APK图标白边
1.概述 在10.0的系统产品开发中,Launcher3定制化开发中,发现在给第三方app的icon绘制图标的时候,会有白边第三方app的图标没有完全绘制出来,而系统app不存在这个问题,是完全绘制出来的,所以需要分析图标绘制类来解决这个问题 2.去掉Launcher3默认给 icon增加的APK图标白…...
E900V21C(S905L-armbian)安装armbian-Ubuntu(WiFi)
基本上是s905L芯片的刷机都是如此,包括Q7等 在网上寻找好多的教程关于e900v21c的刷机包和教程都少的可怜,唯一的就是这个:山东联通版创维E900V21C盒子刷入Armbiam并安装宝塔和Docker,但他是不能用WiFi和蓝牙的然后就是寻找s90l的…...
tpc协议的3次握手和4次挥手
建立连接的3次握手过程: A: 我想和你建立连接,你收到我的请求吗?(我想娶你) B: 好的,我收到了你的请求,我们可以建立连接,我同意。(好的,我愿意嫁给你) A: 好的,我收到了你的回应,我…...
YOLOv5害虫识别项目代码打包完整上传Gitee仓库(已开源)以及git上传速率限制踩坑记录
YOLOv5害虫识别项目代码打包完整上传Gitee仓库(已开源)以及git上传速率限制踩坑记录 ps: 最近很多小伙伴需要这个害虫识别项目的源码,由于文件过大,所以将代码完整上传至gitee,所有文件、教程、论文、以及代码模型…...
从零开始学习c语言|21、动态内存管理
一、malloc函数 1、什么是malloc函数 malloc是memery(内存)和allocate(分配)的缩写,顾名思义,malloc函数为动态分配内存的意思 2、malloc函数语句 int *p(int *)malloc(sizeof(int))malloc函数的形参为申请的内存空间大小,上述申请了一个i…...
swagger关闭/v2/api-docs仍然可以访问漏洞
今天接到安全团队的说swagger有未授权访问漏洞,即使在swagger关闭的情况下http://127.0.0.1:8086/agcloud/v2/api-docs?group%E7%94%A8%E6%88%B7%E5%85%B3%E8%81%94%E4%BF%A1%E6%81%AF%E6%A8%A1%E5%9D%97仍然还能访问。 看了下原来是有写一个拦截器 registry.addI…...
k8s pod调度总结
在Kubernetes平台上,我们很少会直接创建一个Pod,在大多数情况下会通过控制器完成对一组Pod副本的创建、调度 及全生命周期的自动控制任务,如:RC、Deployment、DaemonSet、Job 等。本文主要举例常见的Pod调度。1全自动调度功能&…...
28个案例问题分析---10---对生产环境的敬畏--生产环境
一:背景介绍 1:上午9:23,老师没有进行上课,但是却又很多的在线人员,并且在线人员的时间也不正确。 2:开发人员及时练习用户,查看用户上课情况。 3:10点整,询问项目组长发…...
视觉SLAM十四讲ch7-1视觉里程计笔记
视觉SLAM十四讲ch7-1 视觉里程计笔记本讲目标从本讲开始,开始介绍SLAM系统的重要算法特征点法ORB特征BRIEF实践特征提取与匹配2D-2D:对极几何八点法求E八点法的讨论从单应矩阵恢复R,t小结三角化![在这里插入图片描述](https://img-blog.csdni…...
模仿评论样式
主要用到了padding-left把左侧的空白给留出来,然后把头像定位到留出的空白位置。行内对齐样式,使用了display:inline-flex;align-items:center;图标本来要用字体比较方便,暂时用的从icon font下载的svg样式写的一塌糊涂,一点也没考…...
xxl-job调度中心、执行器源码详解
文章目录简介调度中心一.程序启动初始化1.初始化入口类2.初始化I18n3.初始化快慢调度线程池4.初始化处理执行器注册或移除线程池更新执行器最新在线的守护线程5.初始化监控任务调度失败或执行失败的守护线程6.初始化处理执行器回调线程池监控任务执行结果丢失的守护线程7.初始化…...
cpp c++summary笔记 复杂类型 “right-left” rule
复杂类型 “right-left” rule 先向右走在向左走,循环往复,右侧的终止为看到右括号,右中括号,左侧为左括号,指针(或其他int等)。 符号读作*指向AA的指针(总在左侧)[]容纳AA的数组(总在左侧)()返…...
bash编程(马哥)
bash基础特性: 命令行展开:~,{} 命令别名:alias,unalias 命令历史:history 命令和路径补全:$PATH glob通配符:*,?,[],[^], 快捷键&am…...
搭建Gerrit环境Ubuntu
搭建Gerrit环境 1.安装apache sudo apt-get install apache2 注意:To run Gerrit behind an Apache server using mod_proxy, enable the necessary Apache2 modules: 执行:sudo a2enmod proxy_http 执行:sudo a2enmod ssl 使新的配置生效,需要执行如下命令:serv…...
朋友去华为面试,轻松拿到26K的Offer,羡慕了......
最近有朋友去华为面试,面试前后进行了20天左右,包含4轮电话面试、1轮笔试、1轮主管视频面试、1轮hr视频面试。 据他所说,80%的人都会栽在第一轮面试,要不是他面试前做足准备,估计都坚持不完后面几轮面试。 其实&…...
springboot项目如何配置启动端口
文章目录0 写在前面1 配置文件(.yaml)--推荐2 配置文件(.properties)3 IDEA配置--不推荐4 写在最后0 写在前面 项目启动需要一个独立的端口,所以在此记录一下。 根据配置文件的后缀书写格式略有不同。 1 配置文件(.yaml)–推荐 若是.yaml后缀的配置文件࿰…...
IOS - 抓包通杀篇
IOS中大多数情况,开发者都会使用OC提供的api函数,CFNetworkCopySystemProxySettings来进行代理检测; CFNetworkCopySystemProxySettings 检测函数直接会检测这些ip和端口等: 采用直接附加页面进程: frida -UF -l 通…...
盒子模型的简介
盒子的组成 一个盒子由外到内可以分成四个部分:margin(外边距)、border(边框)、padding(内边距)、content(内容)。会发现margin、border、padding是css属性,因…...
Kubernetes 101,第二部分,pod
在上一篇文章中,我们了解了Kubernetes 的基础知识以及对其主要架构的介绍。 介绍完毕后,就该探索如何在 Kubernetes 中运行应用程序了。 容器包装器 在 Kubernetes 中,我们无法直接创建单个容器。相反,为了更好,我们可以将容器包装成一个单元,其中包括: 规范:多个容器可…...
protobuf序列化解码原理
Protobuf的编码方式 Varints是一种紧凑表示数字的办法。他用一个或者多个字节表示一个数字,值越小的数字节节数越少。相对与传统的用4字节表示int32类型的数字,Varints对于小于128的数值都可以用一个字节表示,大于128的数值会用更多的字节来表…...
OpenCV——line、circle、rectangle、ellipse、polylines函数的使用和绘制文本putText函数以及绘制中文的方法。
学习OpenCV的过程中,画图是不可避免的,本篇文章旨在介绍OpenCV中与画图相关的基础函数。 1、画线条——line()函数 介绍: cv2.line(image, start_point, end_point, color, thickness)参数: image: 图像start_point:…...
性能平台数据提速之路
作者 | 性能中台团队 导读 性能平台负责MEG所有研发数据的管理、接入、传输、应用等各个环节。数据的提速对于公司报表建设、决策分析、转化策略效果都有至关重要的影响。重点介绍数据生产端与消费端提速落地实践,如何高性价比满足大数据生产端提速?如何…...
Dns域名解析服务器
前言 域名解析服务器的介绍 域名服务器的类型划分 DNS域名解析的过程 为什么需要DNS解析域名为IP地址? 通俗理解Dns DNS劫持 DNS污染 Dns面试经验 前言 DNS是一个应用层协议,用来获取域名对应的IP地址 域名解析服务器的介绍 DNS(Dom…...
关于 JavaScript 中的 Promises
在 JavaScript 中,Promise 是一个对象,它表示一个可能还不可用,但会在未来解决的值。Promises 用于处理异步操作,例如发出网络请求或访问数据库,其中结果不是立即可用的。如果你准备好了,我想开始我们的冒险…...
在线设计logo图案免费/seo网络优化教程
# SQL拼接小技巧(一) # 2018/6/21 18:54:36 ---------- **关键字: SQL , 1 1 ,1 2** ---------- 如下代码中SQL拼接使用了11和12防止请求参数中条件为空或SQL注入的方式猜测表名,字段名 public DomainPage<…...
wordpress订阅表格代码/seo公司 上海
linux目录结构及文件基本操作 常用命令 切换目录 cd 当前目录 . 上一级目录 .. (.和..开头的都是隐藏文件) 查看隐藏文件 ls -a 上一级所在目录 - 当前用户home目录 ~ 获取当前所在路径 pwd 创建文件 touch 文件名 创建目录 mkdir 目录名 创建多级目录 m…...
开发公司组织架构及岗位职责/宁波seo咨询
一、对MySQL的锁的了解当数据库有并发事务的时候,可能会产生数据的不一致,这时候需要一些机制来保证访问的次序,锁机制就是这样的一个机制。就像酒店的房间,如果大家随意进出,就会出现多人抢夺同一个房间的情况&#x…...
php 怎么做视频网站/宁波seo高级方法
Winform-Controls 0.General 属性名描述TextAlign文本对其方式Enabled是否可用visible是否可见Anchor自适应窗体实现注意事项: 当自动生成控件时候,需要设置Autozie属性为true,要不然Text显示不全。 1.Form 1.1常用属性 // 1.启动位置this.StartPosition…...
郑州网站托管公司/网站设计的流程
http://www.liangjing.org/qiyejianzhan/Ch/Net_Index.html http://www.phpweb.net/index.php转载于:https://www.cnblogs.com/liulf/archive/2011/04/01/2002745.html...
wordpress 男扮女/东莞市网站seo内容优化
微信小程序实现路径导航—使用搜索组件Searchbar 腾讯地图sdk一、搜索组件Searchbar二、腾讯地图sdk三、开发准备四、敲代码五、最终效果一、搜索组件Searchbar Searchbar是微信小程序WeUI组件库中的,在【微信官方文档小程序】的【扩展能力】中有详细的介绍哦&…...