Viewing File: /home/ubuntu/combine_ai/combine/lib/python3.10/site-packages/gradio/pipelines_utils.py

"""
Defines internal helper methods for handling transformers and diffusers pipelines.
These are used by load_from_pipeline method in pipelines.py.
"""

from typing import Any, Dict, Optional

from PIL import Image

from gradio import components


def handle_transformers_pipeline(pipeline: Any) -> Optional[Dict[str, Any]]:
    try:
        import transformers
    except ImportError as ie:
        raise ImportError(
            "transformers not installed. Please try `pip install transformers`"
        ) from ie

    def is_transformers_pipeline_type(pipeline, class_name: str):
        cls = getattr(transformers, class_name, None)
        return cls and isinstance(pipeline, cls)

    # Handle the different pipelines. The has_attr() checks to make sure the pipeline exists in the
    # version of the transformers library that the user has installed.
    if is_transformers_pipeline_type(pipeline, "AudioClassificationPipeline"):
        return {
            "inputs": components.Audio(
                sources=["microphone"],
                type="filepath",
                label="Input",
                render=False,
            ),
            "outputs": components.Label(label="Class", render=False),
            "preprocess": lambda i: {"inputs": i},
            "postprocess": lambda r: {i["label"].split(", ")[0]: i["score"] for i in r},
        }
    if is_transformers_pipeline_type(pipeline, "AutomaticSpeechRecognitionPipeline"):
        return {
            "inputs": components.Audio(
                sources=["microphone"], type="filepath", label="Input", render=False
            ),
            "outputs": components.Textbox(label="Output", render=False),
            "preprocess": lambda i: {"inputs": i},
            "postprocess": lambda r: r["text"],
        }
    if is_transformers_pipeline_type(pipeline, "FeatureExtractionPipeline"):
        return {
            "inputs": components.Textbox(label="Input", render=False),
            "outputs": components.Dataframe(label="Output", render=False),
            "preprocess": lambda x: {"inputs": x},
            "postprocess": lambda r: r[0],
        }
    if is_transformers_pipeline_type(pipeline, "FillMaskPipeline"):
        return {
            "inputs": components.Textbox(label="Input", render=False),
            "outputs": components.Label(label="Classification", render=False),
            "preprocess": lambda x: {"inputs": x},
            "postprocess": lambda r: {i["token_str"]: i["score"] for i in r},
        }
    if is_transformers_pipeline_type(pipeline, "ImageClassificationPipeline"):
        return {
            "inputs": components.Image(
                type="filepath", label="Input Image", render=False
            ),
            "outputs": components.Label(label="Classification", render=False),
            "preprocess": lambda i: {"images": i},
            "postprocess": lambda r: {i["label"].split(", ")[0]: i["score"] for i in r},
        }
    if is_transformers_pipeline_type(pipeline, "QuestionAnsweringPipeline"):
        return {
            "inputs": [
                components.Textbox(lines=7, label="Context", render=False),
                components.Textbox(label="Question", render=False),
            ],
            "outputs": [
                components.Textbox(label="Answer", render=False),
                components.Label(label="Score", render=False),
            ],
            "preprocess": lambda c, q: {"context": c, "question": q},
            "postprocess": lambda r: (r["answer"], r["score"]),
        }
    if is_transformers_pipeline_type(pipeline, "SummarizationPipeline"):
        return {
            "inputs": components.Textbox(lines=7, label="Input", render=False),
            "outputs": components.Textbox(label="Summary", render=False),
            "preprocess": lambda x: {"inputs": x},
            "postprocess": lambda r: r[0]["summary_text"],
        }
    if is_transformers_pipeline_type(pipeline, "TextClassificationPipeline"):
        return {
            "inputs": components.Textbox(label="Input", render=False),
            "outputs": components.Label(label="Classification", render=False),
            "preprocess": lambda x: [x],
            "postprocess": lambda r: {i["label"].split(", ")[0]: i["score"] for i in r},
        }
    if is_transformers_pipeline_type(pipeline, "TextGenerationPipeline"):
        return {
            "inputs": components.Textbox(label="Input", render=False),
            "outputs": components.Textbox(label="Output", render=False),
            "preprocess": lambda x: {"text_inputs": x},
            "postprocess": lambda r: r[0]["generated_text"],
        }
    if is_transformers_pipeline_type(pipeline, "TranslationPipeline"):
        return {
            "inputs": components.Textbox(label="Input", render=False),
            "outputs": components.Textbox(label="Translation", render=False),
            "preprocess": lambda x: [x],
            "postprocess": lambda r: r[0]["translation_text"],
        }
    if is_transformers_pipeline_type(pipeline, "Text2TextGenerationPipeline"):
        return {
            "inputs": components.Textbox(label="Input", render=False),
            "outputs": components.Textbox(label="Generated Text", render=False),
            "preprocess": lambda x: [x],
            "postprocess": lambda r: r[0]["generated_text"],
        }
    if is_transformers_pipeline_type(pipeline, "ZeroShotClassificationPipeline"):
        return {
            "inputs": [
                components.Textbox(label="Input", render=False),
                components.Textbox(
                    label="Possible class names (" "comma-separated)", render=False
                ),
                components.Checkbox(label="Allow multiple true classes", render=False),
            ],
            "outputs": components.Label(label="Classification", render=False),
            "preprocess": lambda i, c, m: {
                "sequences": i,
                "candidate_labels": c,
                "multi_label": m,
            },
            "postprocess": lambda r: {
                r["labels"][i]: r["scores"][i] for i in range(len(r["labels"]))
            },
        }
    if is_transformers_pipeline_type(pipeline, "DocumentQuestionAnsweringPipeline"):
        return {
            "inputs": [
                components.Image(type="filepath", label="Input Document", render=False),
                components.Textbox(label="Question", render=False),
            ],
            "outputs": components.Label(label="Label", render=False),
            "preprocess": lambda img, q: {"image": img, "question": q},
            "postprocess": lambda r: {i["answer"]: i["score"] for i in r},
        }
    if is_transformers_pipeline_type(pipeline, "VisualQuestionAnsweringPipeline"):
        return {
            "inputs": [
                components.Image(type="filepath", label="Input Image", render=False),
                components.Textbox(label="Question", render=False),
            ],
            "outputs": components.Label(label="Score", render=False),
            "preprocess": lambda img, q: {"image": img, "question": q},
            "postprocess": lambda r: {i["answer"]: i["score"] for i in r},
        }
    if is_transformers_pipeline_type(pipeline, "ImageToTextPipeline"):
        return {
            "inputs": components.Image(
                type="filepath", label="Input Image", render=False
            ),
            "outputs": components.Textbox(label="Text", render=False),
            "preprocess": lambda i: {"images": i},
            "postprocess": lambda r: r[0]["generated_text"],
        }
    if is_transformers_pipeline_type(pipeline, "ObjectDetectionPipeline"):
        return {
            "inputs": components.Image(
                type="filepath", label="Input Image", render=False
            ),
            "outputs": components.AnnotatedImage(
                label="Objects Detected", render=False
            ),
            "preprocess": lambda i: {"inputs": i},
            "postprocess": lambda r, img: (
                img,
                [
                    (
                        (
                            i["box"]["xmin"],
                            i["box"]["ymin"],
                            i["box"]["xmax"],
                            i["box"]["ymax"],
                        ),
                        i["label"],
                    )
                    for i in r
                ],
            ),
        }
    raise ValueError(f"Unsupported transformers pipeline type: {type(pipeline)}")


def handle_diffusers_pipeline(pipeline: Any) -> Optional[Dict[str, Any]]:
    try:
        import diffusers
    except ImportError as ie:
        raise ImportError(
            "diffusers not installed. Please try `pip install diffusers`"
        ) from ie

    def is_diffusers_pipeline_type(pipeline, class_name: str):
        cls = getattr(diffusers, class_name, None)
        return cls and isinstance(pipeline, cls)

    if is_diffusers_pipeline_type(pipeline, "StableDiffusionPipeline"):
        return {
            "inputs": [
                components.Textbox(label="Prompt", render=False),
                components.Textbox(label="Negative prompt", render=False),
                components.Slider(
                    label="Number of inference steps",
                    minimum=1,
                    maximum=500,
                    value=50,
                    step=1,
                ),
                components.Slider(
                    label="Guidance scale",
                    minimum=1,
                    maximum=20,
                    value=7.5,
                    step=0.5,
                ),
            ],
            "outputs": components.Image(
                label="Generated Image", render=False, type="pil"
            ),
            "preprocess": lambda prompt, n_prompt, num_inf_steps, g_scale: {
                "prompt": prompt,
                "negative_prompt": n_prompt,
                "num_inference_steps": num_inf_steps,
                "guidance_scale": g_scale,
            },
            "postprocess": lambda r: r["images"][0],
        }
    if is_diffusers_pipeline_type(pipeline, "StableDiffusionImg2ImgPipeline"):
        return {
            "inputs": [
                components.Textbox(label="Prompt", render=False),
                components.Textbox(label="Negative prompt", render=False),
                components.Image(type="filepath", label="Image", render=False),
                components.Slider(
                    label="Strength", minimum=0, maximum=1, value=0.8, step=0.1
                ),
                components.Slider(
                    label="Number of inference steps",
                    minimum=1,
                    maximum=500,
                    value=50,
                    step=1,
                ),
                components.Slider(
                    label="Guidance scale",
                    minimum=1,
                    maximum=20,
                    value=7.5,
                    step=0.5,
                ),
            ],
            "outputs": components.Image(
                label="Generated Image", render=False, type="pil"
            ),
            "preprocess": lambda prompt,
            n_prompt,
            image,
            strength,
            num_inf_steps,
            g_scale: {
                "prompt": prompt,
                "image": Image.open(image).resize((768, 768)),
                "negative_prompt": n_prompt,
                "num_inference_steps": num_inf_steps,
                "guidance_scale": g_scale,
                "strength": strength,
            },
            "postprocess": lambda r: r["images"][0],
        }
    if is_diffusers_pipeline_type(pipeline, "StableDiffusionInpaintPipeline"):
        return {
            "inputs": [
                components.Textbox(label="Prompt", render=False),
                components.Textbox(label="Negative prompt", render=False),
                components.Image(type="filepath", label="Image", render=False),
                components.Image(type="filepath", label="Mask Image", render=False),
                components.Slider(
                    label="Strength", minimum=0, maximum=1, value=0.8, step=0.1
                ),
                components.Slider(
                    label="Number of inference steps",
                    minimum=1,
                    maximum=500,
                    value=50,
                    step=1,
                ),
                components.Slider(
                    label="Guidance scale",
                    minimum=1,
                    maximum=20,
                    value=7.5,
                    step=0.5,
                ),
            ],
            "outputs": components.Image(
                label="Generated Image", render=False, type="pil"
            ),
            "preprocess": lambda prompt,
            n_prompt,
            image,
            mask_image,
            strength,
            num_inf_steps,
            g_scale: {
                "prompt": prompt,
                "image": Image.open(image).resize((768, 768)),
                "mask_image": Image.open(mask_image).resize((768, 768)),
                "negative_prompt": n_prompt,
                "num_inference_steps": num_inf_steps,
                "guidance_scale": g_scale,
                "strength": strength,
            },
            "postprocess": lambda r: r["images"][0],
        }
    if is_diffusers_pipeline_type(pipeline, "StableDiffusionDepth2ImgPipeline"):
        return {
            "inputs": [
                components.Textbox(label="Prompt", render=False),
                components.Textbox(label="Negative prompt", render=False),
                components.Image(type="filepath", label="Image", render=False),
                components.Slider(
                    label="Strength", minimum=0, maximum=1, value=0.8, step=0.1
                ),
                components.Slider(
                    label="Number of inference steps",
                    minimum=1,
                    maximum=500,
                    value=50,
                    step=1,
                ),
                components.Slider(
                    label="Guidance scale",
                    minimum=1,
                    maximum=20,
                    value=7.5,
                    step=0.5,
                ),
            ],
            "outputs": components.Image(
                label="Generated Image", render=False, type="pil"
            ),
            "preprocess": lambda prompt,
            n_prompt,
            image,
            strength,
            num_inf_steps,
            g_scale: {
                "prompt": prompt,
                "image": Image.open(image).resize((768, 768)),
                "negative_prompt": n_prompt,
                "num_inference_steps": num_inf_steps,
                "guidance_scale": g_scale,
                "strength": strength,
            },
            "postprocess": lambda r: r["images"][0],
        }
    if is_diffusers_pipeline_type(pipeline, "StableDiffusionImageVariationPipeline"):
        return {
            "inputs": [
                components.Image(type="filepath", label="Image", render=False),
                components.Slider(
                    label="Number of inference steps",
                    minimum=1,
                    maximum=500,
                    value=50,
                    step=1,
                ),
                components.Slider(
                    label="Guidance scale",
                    minimum=1,
                    maximum=20,
                    value=7.5,
                    step=0.5,
                ),
            ],
            "outputs": components.Image(
                label="Generated Image", render=False, type="pil"
            ),
            "preprocess": lambda image, num_inf_steps, g_scale: {
                "image": Image.open(image).resize((768, 768)),
                "num_inference_steps": num_inf_steps,
                "guidance_scale": g_scale,
            },
            "postprocess": lambda r: r["images"][0],
        }
    if is_diffusers_pipeline_type(pipeline, "StableDiffusionInstructPix2PixPipeline"):
        return {
            "inputs": [
                components.Textbox(label="Prompt", render=False),
                components.Textbox(label="Negative prompt", render=False),
                components.Image(type="filepath", label="Image", render=False),
                components.Slider(
                    label="Number of inference steps",
                    minimum=1,
                    maximum=500,
                    value=50,
                    step=1,
                ),
                components.Slider(
                    label="Guidance scale",
                    minimum=1,
                    maximum=20,
                    value=7.5,
                    step=0.5,
                ),
                components.Slider(
                    label="Image Guidance scale",
                    minimum=1,
                    maximum=5,
                    value=1.5,
                    step=0.5,
                ),
            ],
            "outputs": components.Image(
                label="Generated Image", render=False, type="pil"
            ),
            "preprocess": lambda prompt,
            n_prompt,
            image,
            num_inf_steps,
            g_scale,
            img_g_scale: {
                "prompt": prompt,
                "image": Image.open(image).resize((768, 768)),
                "negative_prompt": n_prompt,
                "num_inference_steps": num_inf_steps,
                "guidance_scale": g_scale,
                "image_guidance_scale": img_g_scale,
            },
            "postprocess": lambda r: r["images"][0],
        }
    if is_diffusers_pipeline_type(pipeline, "StableDiffusionUpscalePipeline"):
        return {
            "inputs": [
                components.Textbox(label="Prompt", render=False),
                components.Textbox(label="Negative prompt", render=False),
                components.Image(type="filepath", label="Image", render=False),
                components.Slider(
                    label="Number of inference steps",
                    minimum=1,
                    maximum=500,
                    value=50,
                    step=1,
                ),
                components.Slider(
                    label="Guidance scale",
                    minimum=1,
                    maximum=20,
                    value=7.5,
                    step=0.5,
                ),
                components.Slider(
                    label="Noise level", minimum=1, maximum=100, value=20, step=1
                ),
            ],
            "outputs": components.Image(
                label="Generated Image", render=False, type="pil"
            ),
            "preprocess": lambda prompt,
            n_prompt,
            image,
            num_inf_steps,
            g_scale,
            noise_level: {
                "prompt": prompt,
                "image": Image.open(image).resize((768, 768)),
                "negative_prompt": n_prompt,
                "num_inference_steps": num_inf_steps,
                "guidance_scale": g_scale,
                "noise_level": noise_level,
            },
            "postprocess": lambda r: r["images"][0],
        }
    raise ValueError(f"Unsupported diffusers pipeline type: {type(pipeline)}")
Back to Directory File Manager