aboutsummaryrefslogtreecommitdiffstats
path: root/modules/deepbooru.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/deepbooru.py')
-rw-r--r--modules/deepbooru.py173
1 files changed, 173 insertions, 0 deletions
diff --git a/modules/deepbooru.py b/modules/deepbooru.py
new file mode 100644
index 00000000..4ad334a1
--- /dev/null
+++ b/modules/deepbooru.py
@@ -0,0 +1,173 @@
+import os.path
+from concurrent.futures import ProcessPoolExecutor
+import multiprocessing
+import time
+import re
+
+re_special = re.compile(r'([\\()])')
+
+def get_deepbooru_tags(pil_image):
+ """
+ This method is for running only one image at a time for simple use. Used to the img2img interrogate.
+ """
+ from modules import shared # prevents circular reference
+
+ try:
+ create_deepbooru_process(shared.opts.interrogate_deepbooru_score_threshold, create_deepbooru_opts())
+ return get_tags_from_process(pil_image)
+ finally:
+ release_process()
+
+
+OPT_INCLUDE_RANKS = "include_ranks"
+def create_deepbooru_opts():
+ from modules import shared
+
+ return {
+ "use_spaces": shared.opts.deepbooru_use_spaces,
+ "use_escape": shared.opts.deepbooru_escape,
+ "alpha_sort": shared.opts.deepbooru_sort_alpha,
+ OPT_INCLUDE_RANKS: shared.opts.interrogate_return_ranks,
+ }
+
+
+def deepbooru_process(queue, deepbooru_process_return, threshold, deepbooru_opts):
+ model, tags = get_deepbooru_tags_model()
+ while True: # while process is running, keep monitoring queue for new image
+ pil_image = queue.get()
+ if pil_image == "QUIT":
+ break
+ else:
+ deepbooru_process_return["value"] = get_deepbooru_tags_from_model(model, tags, pil_image, threshold, deepbooru_opts)
+
+
+def create_deepbooru_process(threshold, deepbooru_opts):
+ """
+ Creates deepbooru process. A queue is created to send images into the process. This enables multiple images
+ to be processed in a row without reloading the model or creating a new process. To return the data, a shared
+ dictionary is created to hold the tags created. To wait for tags to be returned, a value of -1 is assigned
+ to the dictionary and the method adding the image to the queue should wait for this value to be updated with
+ the tags.
+ """
+ from modules import shared # prevents circular reference
+ shared.deepbooru_process_manager = multiprocessing.Manager()
+ shared.deepbooru_process_queue = shared.deepbooru_process_manager.Queue()
+ shared.deepbooru_process_return = shared.deepbooru_process_manager.dict()
+ shared.deepbooru_process_return["value"] = -1
+ shared.deepbooru_process = multiprocessing.Process(target=deepbooru_process, args=(shared.deepbooru_process_queue, shared.deepbooru_process_return, threshold, deepbooru_opts))
+ shared.deepbooru_process.start()
+
+
+def get_tags_from_process(image):
+ from modules import shared
+
+ shared.deepbooru_process_return["value"] = -1
+ shared.deepbooru_process_queue.put(image)
+ while shared.deepbooru_process_return["value"] == -1:
+ time.sleep(0.2)
+ caption = shared.deepbooru_process_return["value"]
+ shared.deepbooru_process_return["value"] = -1
+
+ return caption
+
+
+def release_process():
+ """
+ Stops the deepbooru process to return used memory
+ """
+ from modules import shared # prevents circular reference
+ shared.deepbooru_process_queue.put("QUIT")
+ shared.deepbooru_process.join()
+ shared.deepbooru_process_queue = None
+ shared.deepbooru_process = None
+ shared.deepbooru_process_return = None
+ shared.deepbooru_process_manager = None
+
+def get_deepbooru_tags_model():
+ import deepdanbooru as dd
+ import tensorflow as tf
+ import numpy as np
+ this_folder = os.path.dirname(__file__)
+ model_path = os.path.abspath(os.path.join(this_folder, '..', 'models', 'deepbooru'))
+ if not os.path.exists(os.path.join(model_path, 'project.json')):
+ # there is no point importing these every time
+ import zipfile
+ from basicsr.utils.download_util import load_file_from_url
+ load_file_from_url(
+ r"https://github.com/KichangKim/DeepDanbooru/releases/download/v3-20211112-sgd-e28/deepdanbooru-v3-20211112-sgd-e28.zip",
+ model_path)
+ with zipfile.ZipFile(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip"), "r") as zip_ref:
+ zip_ref.extractall(model_path)
+ os.remove(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip"))
+
+ tags = dd.project.load_tags_from_project(model_path)
+ model = dd.project.load_model_from_project(
+ model_path, compile_model=False
+ )
+ return model, tags
+
+
+def get_deepbooru_tags_from_model(model, tags, pil_image, threshold, deepbooru_opts):
+ import deepdanbooru as dd
+ import tensorflow as tf
+ import numpy as np
+
+ alpha_sort = deepbooru_opts['alpha_sort']
+ use_spaces = deepbooru_opts['use_spaces']
+ use_escape = deepbooru_opts['use_escape']
+ include_ranks = deepbooru_opts['include_ranks']
+
+ width = model.input_shape[2]
+ height = model.input_shape[1]
+ image = np.array(pil_image)
+ image = tf.image.resize(
+ image,
+ size=(height, width),
+ method=tf.image.ResizeMethod.AREA,
+ preserve_aspect_ratio=True,
+ )
+ image = image.numpy() # EagerTensor to np.array
+ image = dd.image.transform_and_pad_image(image, width, height)
+ image = image / 255.0
+ image_shape = image.shape
+ image = image.reshape((1, image_shape[0], image_shape[1], image_shape[2]))
+
+ y = model.predict(image)[0]
+
+ result_dict = {}
+
+ for i, tag in enumerate(tags):
+ result_dict[tag] = y[i]
+
+ unsorted_tags_in_theshold = []
+ result_tags_print = []
+ for tag in tags:
+ if result_dict[tag] >= threshold:
+ if tag.startswith("rating:"):
+ continue
+ unsorted_tags_in_theshold.append((result_dict[tag], tag))
+ result_tags_print.append(f'{result_dict[tag]} {tag}')
+
+ # sort tags
+ result_tags_out = []
+ sort_ndx = 0
+ if alpha_sort:
+ sort_ndx = 1
+
+ # sort by reverse by likelihood and normal for alpha, and format tag text as requested
+ unsorted_tags_in_theshold.sort(key=lambda y: y[sort_ndx], reverse=(not alpha_sort))
+ for weight, tag in unsorted_tags_in_theshold:
+ # note: tag_outformat will still have a colon if include_ranks is True
+ tag_outformat = tag.replace(':', ' ')
+ if use_spaces:
+ tag_outformat = tag_outformat.replace('_', ' ')
+ if use_escape:
+ tag_outformat = re.sub(re_special, r'\\\1', tag_outformat)
+ if include_ranks:
+ tag_outformat = f"({tag_outformat}:{weight:.3f})"
+
+ result_tags_out.append(tag_outformat)
+
+ print('\n'.join(sorted(result_tags_print, reverse=True)))
+
+ return ', '.join(result_tags_out)