Create a dedicated progress state class

This commit is contained in:
Simon Sawicki 2024-12-27 16:17:45 +01:00
parent d6a9a93252
commit e63878e235
No known key found for this signature in database
4 changed files with 45 additions and 35 deletions

View File

@ -157,7 +157,7 @@ from .utils import (
write_json_file, write_json_file,
write_string, write_string,
) )
from .utils._utils import _UnsafeExtensionError, _YDLLogger from .utils._utils import _UnsafeExtensionError, _YDLLogger, _ProgressState
from .utils.networking import ( from .utils.networking import (
HTTPHeaderDict, HTTPHeaderDict,
clean_headers, clean_headers,
@ -955,14 +955,18 @@ class YoutubeDL:
self._write_string(code, self._out_files.console) self._write_string(code, self._out_files.console)
return True return True
def to_console_title(self, message): def to_console_title(self, message=None, progress_state=None, percent=None):
if not self.params.get('consoletitle'): if not self.params.get('consoletitle'):
return return
message = remove_terminal_sequences(message)
if not self._send_console_code(f'\033]0;{message}\007'): if message:
if os.name == 'nt' and ctypes.windll.kernel32.GetConsoleWindow(): success = self._send_console_code(f'\033]0;{remove_terminal_sequences(message)}\007')
if not success and os.name == 'nt' and ctypes.windll.kernel32.GetConsoleWindow():
ctypes.windll.kernel32.SetConsoleTitleW(message) ctypes.windll.kernel32.SetConsoleTitleW(message)
if isinstance(progress_state, _ProgressState):
self._send_console_code(progress_state.get_ansi_escape(percent))
def save_console_title(self): def save_console_title(self):
if not self.params.get('consoletitle') or self.params.get('simulate'): if not self.params.get('consoletitle') or self.params.get('simulate'):
return return
@ -975,10 +979,7 @@ class YoutubeDL:
def __enter__(self): def __enter__(self):
self.save_console_title() self.save_console_title()
if self.params.get('consoletitle'): self.to_console_title(progress_state=_ProgressState.INDETERMINATE)
# Set progress to "indeterminate"
# See: https://conemu.github.io/en/AnsiEscapeCodes.html#ConEmu_specific_OSC
self._send_console_code('\033]9;4;3;0\007')
return self return self
def save_cookies(self): def save_cookies(self):
@ -987,10 +988,7 @@ class YoutubeDL:
def __exit__(self, *args): def __exit__(self, *args):
self.restore_console_title() self.restore_console_title()
if self.params.get('consoletitle'): self.to_console_title(progress_state=_ProgressState.HIDDEN)
# Set progress to "disabled"
# See: https://conemu.github.io/en/AnsiEscapeCodes.html#ConEmu_specific_OSC
self._send_console_code('\033]9;4;0;0\007')
self.close() self.close()
def close(self): def close(self):

View File

@ -31,6 +31,7 @@ from ..utils import (
timetuple_from_msec, timetuple_from_msec,
try_call, try_call,
) )
from ..utils._utils import _ProgressState
class FileDownloader: class FileDownloader:
@ -333,17 +334,7 @@ class FileDownloader:
progress_dict), s.get('progress_idx') or 0) progress_dict), s.get('progress_idx') or 0)
self.to_console_title(self.ydl.evaluate_outtmpl( self.to_console_title(self.ydl.evaluate_outtmpl(
progress_template.get('download-title') or 'yt-dlp %(progress._default_template)s', progress_template.get('download-title') or 'yt-dlp %(progress._default_template)s',
progress_dict)) progress_dict), _ProgressState.from_dict(s), s.get('_progress'))
percent = s.get('_percent')
if s['status'] not in ('downloading', 'error', 'finished') or percent is None:
return
# Emit ConEmu progress codes: https://conemu.github.io/en/AnsiEscapeCodes.html#ConEmu_specific_OSC
if s['status'] == 'finished':
self.ydl._send_console_code('\033]9;4;3;0\007')
return
state = 1 if s['status'] == 'downloading' else 2
self.ydl._send_console_code(f'\033]9;4;{state};{int(percent)}\007')
def _format_progress(self, *args, **kwargs): def _format_progress(self, *args, **kwargs):
return self.ydl._format_text( return self.ydl._format_text(

View File

@ -10,6 +10,7 @@ from ..utils import (
_configuration_args, _configuration_args,
deprecation_warning, deprecation_warning,
) )
from ..utils._utils import _ProgressState
class PostProcessorMetaClass(type): class PostProcessorMetaClass(type):
@ -189,17 +190,7 @@ class PostProcessor(metaclass=PostProcessorMetaClass):
self._downloader.to_console_title(self._downloader.evaluate_outtmpl( self._downloader.to_console_title(self._downloader.evaluate_outtmpl(
progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s', progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s',
progress_dict)) progress_dict), _ProgressState.from_dict(s), s.get('_progress'))
percent = s.get('_percent')
if s['status'] not in ('downloading', 'error', 'finished') or percent is None:
return
# Emit ConEmu progress codes: https://conemu.github.io/en/AnsiEscapeCodes.html#ConEmu_specific_OSC
if s['status'] == 'finished':
self._downloader._send_console_code('\033]9;4;3;0\007')
return
state = 1 if s['status'] == 'downloading' else 2
self._downloader._send_console_code(f'\033]9;4;{state};{int(percent)}\007')
def _retry_download(self, err, count, retries): def _retry_download(self, err, count, retries):
# While this is not an extractor, it behaves similar to one and # While this is not an extractor, it behaves similar to one and

View File

@ -8,6 +8,7 @@ import contextlib
import datetime as dt import datetime as dt
import email.header import email.header
import email.utils import email.utils
import enum
import errno import errno
import functools import functools
import hashlib import hashlib
@ -5656,3 +5657,32 @@ class _YDLLogger:
def stderr(self, message): def stderr(self, message):
if self._ydl: if self._ydl:
self._ydl.to_stderr(message) self._ydl.to_stderr(message)
class _ProgressState(enum.Enum):
"""
Represents a state for a progress bar.
See: https://conemu.github.io/en/AnsiEscapeCodes.html#ConEmu_specific_OSC
"""
HIDDEN = 0
INDETERMINATE = 3
VISIBLE = 1
WARNING = 4
ERROR = 2
@classmethod
def from_dict(cls, s, /):
if s['status'] == 'finished':
return cls.INDETERMINATE
# Not currently used
if s['status'] == 'error':
return cls.ERROR
return cls.INDETERMINATE if s.get('_percent') is None else cls.VISIBLE
def get_ansi_escape(self, /, percent=None):
percent = 0 if percent is None else int(percent)
return f'\033]9;4;{self.value};{percent}\007'