diff --git a/README.md b/README.md index 81442a70..ddf5f25d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,13 @@ -![alt text](images/python_logo.ico) +

+ + CodingFleet Code Generator + + CodingFleet Code Converter + +

+ + + # Python Code Tutorials This is a repository of all the tutorials of [The Python Code](https://www.thepythoncode.com) website. ## List of Tutorials @@ -178,6 +187,7 @@ This is a repository of all the tutorials of [The Python Code](https://www.thepy - [How to Query the Ethereum Blockchain with Python](https://www.thepythoncode.com/article/query-ethereum-blockchain-with-python). ([code](general/query-ethereum)) - [Data Cleaning with Pandas in Python](https://www.thepythoncode.com/article/data-cleaning-using-pandas-in-python). ([code](general/data-cleaning-pandas)) - [How to Minify CSS with Python](https://www.thepythoncode.com/article/minimize-css-files-in-python). ([code](general/minify-css)) + - [Build a real MCP client and server in Python with FastMCP (Todo Manager example)](https://www.thepythoncode.com/article/fastmcp-mcp-client-server-todo-manager). ([code](general/fastmcp-mcp-client-server-todo-manager)) @@ -200,6 +210,7 @@ This is a repository of all the tutorials of [The Python Code](https://www.thepy - [How to Extract Google Trends Data in Python](https://www.thepythoncode.com/article/extract-google-trends-data-in-python). ([code](web-scraping/extract-google-trends-data)) - [How to Make a YouTube Video Downloader in Python](https://www.thepythoncode.com/article/make-a-youtube-video-downloader-in-python). ([code](web-scraping/youtube-video-downloader)) - [How to Build a YouTube Audio Downloader in Python](https://www.thepythoncode.com/article/build-a-youtube-mp3-downloader-tkinter-python). ([code](web-scraping/youtube-mp3-downloader)) + - [YouTube Video Transcription Summarization with Python](https://thepythoncode.com/article/youtube-video-transcription-and-summarization-with-python). ([code](web-scraping/youtube-transcript-summarizer/)) - ### [Python Standard Library](https://www.thepythoncode.com/topic/python-standard-library) - [How to Transfer Files in the Network using Sockets in Python](https://www.thepythoncode.com/article/send-receive-files-using-sockets-python). ([code](general/transfer-files/)) @@ -285,6 +296,7 @@ This is a repository of all the tutorials of [The Python Code](https://www.thepy - [How to Compress Images in Python](https://www.thepythoncode.com/article/compress-images-in-python). ([code](python-for-multimedia/compress-image)) - [How to Remove Metadata from an Image in Python](https://thepythoncode.com/article/how-to-clear-image-metadata-in-python). ([code](python-for-multimedia/remove-metadata-from-images)) - [How to Create Videos from Images in Python](https://thepythoncode.com/article/create-a-video-from-images-opencv-python). ([code](python-for-multimedia/create-video-from-images)) + - [How to Recover Deleted Files with Python](https://thepythoncode.com/article/how-to-recover-deleted-file-with-python). ([code](python-for-multimedia/recover-deleted-files)) - ### [Web Programming](https://www.thepythoncode.com/topic/web-programming) - [Detecting Fraudulent Transactions in a Streaming Application using Kafka in Python](https://www.thepythoncode.com/article/detect-fraudulent-transactions-with-apache-kafka-in-python). ([code](general/detect-fraudulent-transactions)) diff --git a/ethical-hacking/get-wifi-passwords/README.md b/ethical-hacking/get-wifi-passwords/README.md index e24eda7f..a10efc10 100644 --- a/ethical-hacking/get-wifi-passwords/README.md +++ b/ethical-hacking/get-wifi-passwords/README.md @@ -1 +1,3 @@ -# [How to Extract Saved WiFi Passwords in Python](https://www.thepythoncode.com/article/extract-saved-wifi-passwords-in-python) \ No newline at end of file +# [How to Extract Saved WiFi Passwords in Python](https://www.thepythoncode.com/article/extract-saved-wifi-passwords-in-python) + +This program lists saved Wi-Fi networks and their passwords on Windows and Linux machines. In addition to the SSID (Wi-Fi network name) and passwords, the output also shows the network’s security type and ciphers. \ No newline at end of file diff --git a/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py b/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py index 0afd70ca..ff32f6f8 100644 --- a/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py +++ b/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py @@ -28,10 +28,16 @@ def get_windows_saved_wifi_passwords(verbose=1): [list]: list of extracted profiles, a profile has the fields ["ssid", "ciphers", "key"] """ ssids = get_windows_saved_ssids() - Profile = namedtuple("Profile", ["ssid", "ciphers", "key"]) + Profile = namedtuple("Profile", ["ssid", "security", "ciphers", "key"]) profiles = [] for ssid in ssids: ssid_details = subprocess.check_output(f"""netsh wlan show profile "{ssid}" key=clear""").decode() + + #get the security type + security = re.findall(r"Authentication\s(.*)", ssid_details) + # clear spaces and colon + security = "/".join(dict.fromkeys(c.strip().strip(":").strip() for c in security)) + # get the ciphers ciphers = re.findall(r"Cipher\s(.*)", ssid_details) # clear spaces and colon @@ -43,7 +49,7 @@ def get_windows_saved_wifi_passwords(verbose=1): key = key[0].strip().strip(":").strip() except IndexError: key = "None" - profile = Profile(ssid=ssid, ciphers=ciphers, key=key) + profile = Profile(ssid=ssid, security=security, ciphers=ciphers, key=key) if verbose >= 1: print_windows_profile(profile) profiles.append(profile) @@ -52,12 +58,13 @@ def get_windows_saved_wifi_passwords(verbose=1): def print_windows_profile(profile): """Prints a single profile on Windows""" - print(f"{profile.ssid:25}{profile.ciphers:15}{profile.key:50}") + #print(f"{profile.ssid:25}{profile.ciphers:15}{profile.key:50}") + print(f"{profile.ssid:25}{profile.security:30}{profile.ciphers:35}{profile.key:50}") def print_windows_profiles(verbose): """Prints all extracted SSIDs along with Key on Windows""" - print("SSID CIPHER(S) KEY") + print("SSID Securities CIPHER(S) KEY") print("-"*50) get_windows_saved_wifi_passwords(verbose) diff --git a/ethical-hacking/http-security-headers/README.md b/ethical-hacking/http-security-headers/README.md new file mode 100644 index 00000000..e0e7b1d0 --- /dev/null +++ b/ethical-hacking/http-security-headers/README.md @@ -0,0 +1,2 @@ +Grab your API key from Open Router:- https://openrouter.ai/ +Model is Used is DeepSeek: DeepSeek V3.1 (free). However, feel free to try others. \ No newline at end of file diff --git a/ethical-hacking/http-security-headers/http_security_headers.py b/ethical-hacking/http-security-headers/http_security_headers.py new file mode 100644 index 00000000..67b494c4 --- /dev/null +++ b/ethical-hacking/http-security-headers/http_security_headers.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +import requests +import json +import os +import argparse +from typing import Dict, List, Tuple +from openai import OpenAI + +class SecurityHeadersAnalyzer: + def __init__(self, api_key: str = None, base_url: str = None, model: str = None): + self.api_key = api_key or os.getenv('OPENROUTER_API_KEY') or os.getenv('OPENAI_API_KEY') + self.base_url = base_url or os.getenv('OPENROUTER_BASE_URL', 'https://openrouter.ai/api/v1') + self.model = model or os.getenv('LLM_MODEL', 'deepseek/deepseek-chat-v3.1:free') + + if not self.api_key: + raise ValueError("API key is required. Set OPENROUTER_API_KEY or provide --api-key") + + self.client = OpenAI(base_url=self.base_url, api_key=self.api_key) + + def fetch_headers(self, url: str, timeout: int = 10) -> Tuple[Dict[str, str], int]: + """Fetch HTTP headers from URL""" + if not url.startswith(('http://', 'https://')): + url = 'https://' + url + + try: + response = requests.get(url, timeout=timeout, allow_redirects=True) + return dict(response.headers), response.status_code + except requests.exceptions.RequestException as e: + print(f"Error fetching {url}: {e}") + return {}, 0 + + def analyze_headers(self, url: str, headers: Dict[str, str], status_code: int) -> str: + """Analyze headers using LLM""" + prompt = f"""Analyze the HTTP security headers for {url} (Status: {status_code}) + +Headers: +{json.dumps(headers, indent=2)} + +Provide a comprehensive security analysis including: +1. Security score (0-100) and overall assessment +2. Critical security issues that need immediate attention +3. Missing important security headers +4. Analysis of existing security headers and their effectiveness +5. Specific recommendations for improvement +6. Potential security risks based on current configuration + +Focus on practical, actionable advice following current web security best practices. Please do not include ** and # +in the response except for specific references where necessary. use numbers, romans, alphabets instead Format the response well please. """ + + try: + completion = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + temperature=0.2 + ) + return completion.choices[0].message.content + except Exception as e: + return f"Analysis failed: {e}" + + def analyze_url(https://github.com/self,%20url:%20str,%20timeout:%20int%20=%2010) -> Dict: + """Analyze a single URL""" + print(f"\nAnalyzing: {url}") + print("-" * 50) + + headers, status_code = self.fetch_headers(url, timeout) + if not headers: + return {"url": url, "error": "Failed to fetch headers"} + + print(f"Status Code: {status_code}") + print(f"\nHTTP Headers ({len(headers)} found):") + print("-" * 30) + for key, value in headers.items(): + print(f"{key}: {value}") + + print(f"\nAnalyzing with AI...") + analysis = self.analyze_headers(url, headers, status_code) + + print("\nSECURITY ANALYSIS") + print("=" * 50) + print(analysis) + + return { + "url": url, + "status_code": status_code, + "headers_count": len(headers), + "analysis": analysis, + "raw_headers": headers + } + + def analyze_multiple_urls(self, urls: List[str], timeout: int = 10) -> List[Dict]: + """Analyze multiple URLs""" + results = [] + for i, url in enumerate(urls, 1): + print(f"\n[{i}/{len(urls)}]") + result = self.analyze_url(https://github.com/url,%20timeout) + results.append(result) + return results + + def export_results(self, results: List[Dict], filename: str): + """Export results to JSON""" + with open(filename, 'w') as f: + json.dump(results, f, indent=2, ensure_ascii=False) + print(f"\nResults exported to: {filename}") + +def main(): + parser = argparse.ArgumentParser( + description='Analyze HTTP security headers using AI', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog='''Examples: + python security_headers.py https://example.com + python security_headers.py example.com google.com + python security_headers.py example.com --export results.json + +Environment Variables: + OPENROUTER_API_KEY - API key for OpenRouter + OPENAI_API_KEY - API key for OpenAI + LLM_MODEL - Model to use (default: deepseek/deepseek-chat-v3.1:free)''' + ) + + parser.add_argument('urls', nargs='+', help='URLs to analyze') + parser.add_argument('--api-key', help='API key for LLM service') + parser.add_argument('--base-url', help='Base URL for LLM API') + parser.add_argument('--model', help='LLM model to use') + parser.add_argument('--timeout', type=int, default=10, help='Request timeout (default: 10s)') + parser.add_argument('--export', help='Export results to JSON file') + + args = parser.parse_args() + + try: + analyzer = SecurityHeadersAnalyzer( + api_key=args.api_key, + base_url=args.base_url, + model=args.model + ) + + results = analyzer.analyze_multiple_urls(args.urls, args.timeout) + + if args.export: + analyzer.export_results(results, args.export) + + except ValueError as e: + print(f"Error: {e}") + return 1 + except KeyboardInterrupt: + print("\nAnalysis interrupted by user") + return 1 + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/ethical-hacking/http-security-headers/requirements.txt b/ethical-hacking/http-security-headers/requirements.txt new file mode 100644 index 00000000..f0dd0aec --- /dev/null +++ b/ethical-hacking/http-security-headers/requirements.txt @@ -0,0 +1 @@ +openai \ No newline at end of file diff --git a/general/fastmcp-mcp-client-server-todo-manager/README.md b/general/fastmcp-mcp-client-server-todo-manager/README.md new file mode 100644 index 00000000..dd988428 --- /dev/null +++ b/general/fastmcp-mcp-client-server-todo-manager/README.md @@ -0,0 +1,39 @@ +# Build a real MCP client and server in Python with FastMCP (Todo Manager example) + +This folder contains the code that accompanies the article: + +- Article: https://www.thepythoncode.com/article/fastmcp-mcp-client-server-todo-manager + +What’s included +- `todo_server.py`: FastMCP MCP server exposing tools, resources, and a prompt for a Todo Manager. +- `todo_client_test.py`: A small client script that connects to the server and exercises all features. +- `requirements.txt`: Python dependencies for this tutorial. + +Quick start +1) Install requirements +```bash +python -m venv .venv && source .venv/bin/activate # or use your preferred env manager +pip install -r requirements.txt +``` + +2) Run the server (stdio transport by default) +```bash +python todo_server.py +``` + +3) In a separate terminal, run the client +```bash +python todo_client_test.py +``` + +Optional: run the server over HTTP +- In `todo_server.py`, replace the last line with: +```python +mcp.run(transport="http", host="127.0.0.1", port=8000) +``` +- Then change the client constructor to `Client("http://127.0.0.1:8000/mcp")`. + +Notes +- Requires Python 3.10+. +- The example uses in-memory storage for simplicity. +- For production tips (HTTPS, auth, containerization), see the article. diff --git a/general/fastmcp-mcp-client-server-todo-manager/requirements.txt b/general/fastmcp-mcp-client-server-todo-manager/requirements.txt new file mode 100644 index 00000000..2c9387f7 --- /dev/null +++ b/general/fastmcp-mcp-client-server-todo-manager/requirements.txt @@ -0,0 +1 @@ +fastmcp>=2.12 \ No newline at end of file diff --git a/general/fastmcp-mcp-client-server-todo-manager/todo_client_test.py b/general/fastmcp-mcp-client-server-todo-manager/todo_client_test.py new file mode 100644 index 00000000..f01a1e78 --- /dev/null +++ b/general/fastmcp-mcp-client-server-todo-manager/todo_client_test.py @@ -0,0 +1,50 @@ +import asyncio +from fastmcp import Client + +async def main(): + # Option A: Connect to local Python script (stdio) + client = Client("todo_server.py") + + # Option B: In-memory (for tests) + # from todo_server import mcp + # client = Client(mcp) + + async with client: + await client.ping() + print("[OK] Connected") + + # Create a few todos + t1 = await client.call_tool("create_todo", {"title": "Write README", "priority": "high"}) + t2 = await client.call_tool("create_todo", {"title": "Refactor utils", "description": "Split helpers into modules"}) + t3 = await client.call_tool("create_todo", {"title": "Add tests", "priority": "low"}) + print("Created IDs:", t1.data["id"], t2.data["id"], t3.data["id"]) + + # List open + open_list = await client.call_tool("list_todos", {"status": "open"}) + print("Open IDs:", [t["id"] for t in open_list.data["items"]]) + + # Complete one + updated = await client.call_tool("complete_todo", {"todo_id": t2.data["id"]}) + print("Completed:", updated.data["id"], "status:", updated.data["status"]) + + # Search + found = await client.call_tool("search_todos", {"query": "readme"}) + print("Search 'readme':", [t["id"] for t in found.data["items"]]) + + # Resources + stats = await client.read_resource("stats://todos") + print("Stats:", getattr(stats[0], "text", None) or stats[0]) + + todo2 = await client.read_resource(f"todo://{t2.data['id']}") + print("todo://{id}:", getattr(todo2[0], "text", None) or todo2[0]) + + # Prompt + prompt_msgs = await client.get_prompt("suggest_next_action", {"pending": 2, "project": "MCP tutorial"}) + msgs_pretty = [ + {"role": m.role, "content": getattr(m, "content", None) or getattr(m, "text", None)} + for m in getattr(prompt_msgs, "messages", []) + ] + print("Prompt messages:", msgs_pretty) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/general/fastmcp-mcp-client-server-todo-manager/todo_server.py b/general/fastmcp-mcp-client-server-todo-manager/todo_server.py new file mode 100644 index 00000000..64f99b73 --- /dev/null +++ b/general/fastmcp-mcp-client-server-todo-manager/todo_server.py @@ -0,0 +1,88 @@ +from typing import Literal +from itertools import count +from datetime import datetime, timezone +from fastmcp import FastMCP + +# In-memory storage for demo purposes +TODOS: list[dict] = [] +_id = count(start=1) + +mcp = FastMCP(name="Todo Manager") + +@mcp.tool +def create_todo( + title: str, + description: str = "", + priority: Literal["low", "medium", "high"] = "medium", +) -> dict: + """Create a todo (id, title, status, priority, timestamps).""" + todo = { + "id": next(_id), + "title": title, + "description": description, + "priority": priority, + "status": "open", + "created_at": datetime.now(timezone.utc).isoformat(), + "completed_at": None, + } + TODOS.append(todo) + return todo + +@mcp.tool +def list_todos(status: Literal["open", "done", "all"] = "open") -> dict: + """List todos by status ('open' | 'done' | 'all').""" + if status == "all": + items = TODOS + elif status == "open": + items = [t for t in TODOS if t["status"] == "open"] + else: + items = [t for t in TODOS if t["status"] == "done"] + return {"items": items} + +@mcp.tool +def complete_todo(todo_id: int) -> dict: + """Mark a todo as done.""" + for t in TODOS: + if t["id"] == todo_id: + t["status"] = "done" + t["completed_at"] = datetime.now(timezone.utc).isoformat() + return t + raise ValueError(f"Todo {todo_id} not found") + +@mcp.tool +def search_todos(query: str) -> dict: + """Case-insensitive search in title/description.""" + q = query.lower().strip() + items = [t for t in TODOS if q in t["title"].lower() or q in t["description"].lower()] + return {"items": items} + +# Read-only resources +@mcp.resource("stats://todos") +def todo_stats() -> dict: + """Aggregated stats: total, open, done.""" + total = len(TODOS) + open_count = sum(1 for t in TODOS if t["status"] == "open") + done_count = total - open_count + return {"total": total, "open": open_count, "done": done_count} + +@mcp.resource("todo://{id}") +def get_todo(id: int) -> dict: + """Fetch a single todo by id.""" + for t in TODOS: + if t["id"] == id: + return t + raise ValueError(f"Todo {id} not found") + +# A reusable prompt +@mcp.prompt +def suggest_next_action(pending: int, project: str | None = None) -> str: + """Render a small instruction for an LLM to propose next action.""" + base = f"You have {pending} pending TODOs. " + if project: + base += f"They relate to the project '{project}'. " + base += "Suggest the most impactful next action in one short sentence." + return base + +if __name__ == "__main__": + # Default transport is stdio; you can also use transport="http", host=..., port=... + mcp.run() diff --git a/general/interactive-weather-plot/interactive_weather_plot.py b/general/interactive-weather-plot/interactive_weather_plot.py index b4d17141..3d1ea566 100644 --- a/general/interactive-weather-plot/interactive_weather_plot.py +++ b/general/interactive-weather-plot/interactive_weather_plot.py @@ -68,7 +68,7 @@ def changeLocation(newLocation): # Making the Radio Buttons buttons = RadioButtons( ax=plt.axes([0.1, 0.1, 0.2, 0.2]), - labels=locations.keys() + labels=list(locations.keys()) ) # Connect click event on the buttons to the function that changes location. @@ -86,4 +86,4 @@ def changeLocation(newLocation): plt.savefig('file.svg', format='svg') -plt.show() \ No newline at end of file +plt.show() diff --git a/gui-programming/rich-text-editor/rich_text_editor.py b/gui-programming/rich-text-editor/rich_text_editor.py index 10c14263..05259905 100644 --- a/gui-programming/rich-text-editor/rich_text_editor.py +++ b/gui-programming/rich-text-editor/rich_text_editor.py @@ -112,9 +112,9 @@ def fileManager(event=None, action=None): document['tags'][tagName] = [] ranges = textArea.tag_ranges(tagName) - - for i, tagRange in enumerate(ranges[::2]): - document['tags'][tagName].append([str(tagRange), str(ranges[i+1])]) + + for i in range(0, len(ranges), 2): + document['tags'][tagName].append([str(ranges[i]), str(ranges[i + 1])]) if not filePath: # ask the user for a filename with the native file explorer. diff --git a/handling-pdf-files/pdf-compressor/README.md b/handling-pdf-files/pdf-compressor/README.md index 4527174c..307f105c 100644 --- a/handling-pdf-files/pdf-compressor/README.md +++ b/handling-pdf-files/pdf-compressor/README.md @@ -1,8 +1,48 @@ # [How to Compress PDF Files in Python](https://www.thepythoncode.com/article/compress-pdf-files-in-python) -To run this: -- `pip3 install -r requirements.txt` -- To compress `bert-paper.pdf` file: - ``` - $ python pdf_compressor.py bert-paper.pdf bert-paper-min.pdf - ``` - This will spawn a new compressed PDF file under the name `bert-paper-min.pdf`. + +This directory contains two approaches: + +- Legacy (commercial): `pdf_compressor.py` uses PDFTron/PDFNet. PDFNet now requires a license key and the old pip package is not freely available, so this may not work without a license. +- Recommended (open source): `pdf_compressor_ghostscript.py` uses Ghostscript to compress PDFs. + +## Ghostscript method (recommended) + +Prerequisite: Install Ghostscript + +- macOS (Homebrew): + - `brew install ghostscript` +- Ubuntu/Debian: + - `sudo apt-get update && sudo apt-get install -y ghostscript` +- Windows: + - Download and install from https://ghostscript.com/releases/ + - Ensure `gswin64c.exe` (or `gswin32c.exe`) is in your PATH. + +No Python packages are required for this method, only Ghostscript. + +### Usage + +To compress `bert-paper.pdf` into `bert-paper-min.pdf` with default quality (`power=2`): + +``` +python pdf_compressor_ghostscript.py bert-paper.pdf bert-paper-min.pdf +``` + +Optional quality level `[power]` controls compression/quality tradeoff (maps to Ghostscript `-dPDFSETTINGS`): + +- 0 = `/screen` (smallest, lowest quality) +- 1 = `/ebook` (good quality) +- 2 = `/printer` (high quality) [default] +- 3 = `/prepress` (very high quality) +- 4 = `/default` (Ghostscript default) + +Example: + +``` +python pdf_compressor_ghostscript.py bert-paper.pdf bert-paper-min.pdf 1 +``` + +In testing, `bert-paper.pdf` (~757 KB) compressed to ~407 KB with `power=1`. + +## Legacy PDFNet method (requires license) + +If you have a valid license and the PDFNet SDK installed, you can use the original `pdf_compressor.py` script. Note that the previously referenced `PDFNetPython3` pip package is not freely available and may not install via pip. Refer to the vendor's documentation for installation and licensing. \ No newline at end of file diff --git a/handling-pdf-files/pdf-compressor/pdf_compressor_ghostscript.py b/handling-pdf-files/pdf-compressor/pdf_compressor_ghostscript.py new file mode 100644 index 00000000..88de4062 --- /dev/null +++ b/handling-pdf-files/pdf-compressor/pdf_compressor_ghostscript.py @@ -0,0 +1,103 @@ +import os +import sys +import subprocess +import shutil + + +def get_size_format(b, factor=1024, suffix="B"): + for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: + if b < factor: + return f"{b:.2f}{unit}{suffix}" + b /= factor + return f"{b:.2f}Y{suffix}" + + +def find_ghostscript_executable(): + candidates = [ + shutil.which('gs'), + shutil.which('gswin64c'), + shutil.which('gswin32c'), + ] + for c in candidates: + if c: + return c + return None + + +def compress_file(input_file: str, output_file: str, power: int = 2): + """Compress PDF using Ghostscript. + + power: + 0 -> /screen (lowest quality, highest compression) + 1 -> /ebook (good quality) + 2 -> /printer (high quality) [default] + 3 -> /prepress (very high quality) + 4 -> /default (Ghostscript default) + """ + if not os.path.exists(input_file): + raise FileNotFoundError(f"Input file not found: {input_file}") + if not output_file: + output_file = input_file + + initial_size = os.path.getsize(input_file) + + gs = find_ghostscript_executable() + if not gs: + raise RuntimeError( + "Ghostscript not found. Install it and ensure 'gs' (Linux/macOS) " + "or 'gswin64c'/'gswin32c' (Windows) is in PATH." + ) + + settings_map = { + 0: '/screen', + 1: '/ebook', + 2: '/printer', + 3: '/prepress', + 4: '/default', + } + pdfsettings = settings_map.get(power, '/printer') + + cmd = [ + gs, + '-sDEVICE=pdfwrite', + '-dCompatibilityLevel=1.4', + f'-dPDFSETTINGS={pdfsettings}', + '-dNOPAUSE', + '-dBATCH', + '-dQUIET', + f'-sOutputFile={output_file}', + input_file, + ] + + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError as e: + print(f"Ghostscript failed: {e}") + return False + + compressed_size = os.path.getsize(output_file) + ratio = 1 - (compressed_size / initial_size) + summary = { + "Input File": input_file, + "Initial Size": get_size_format(initial_size), + "Output File": output_file, + "Compressed Size": get_size_format(compressed_size), + "Compression Ratio": f"{ratio:.3%}", + } + + print("## Summary ########################################################") + for k, v in summary.items(): + print(f"{k}: {v}") + print("###################################################################") + return True + + +if __name__ == '__main__': + if len(sys.argv) < 3: + print("Usage: python pdf_compressor_ghostscript.py [power 0-4]") + sys.exit(1) + input_file = sys.argv[1] + output_file = sys.argv[2] + power = int(sys.argv[3]) if len(sys.argv) > 3 else 2 + ok = compress_file(input_file, output_file, power) + sys.exit(0 if ok else 2) \ No newline at end of file diff --git a/handling-pdf-files/pdf-compressor/requirements.txt b/handling-pdf-files/pdf-compressor/requirements.txt index 0a664a86..9f6e5337 100644 --- a/handling-pdf-files/pdf-compressor/requirements.txt +++ b/handling-pdf-files/pdf-compressor/requirements.txt @@ -1 +1,7 @@ -PDFNetPython3==8.1.0 \ No newline at end of file +# No Python dependencies required for Ghostscript-based compressor. +# System dependency: Ghostscript +# - macOS: brew install ghostscript +# - Debian: sudo apt-get install -y ghostscript +# - Windows: https://ghostscript.com/releases/ +# +# The legacy script (pdf_compressor.py) depends on PDFNet (commercial) and a license key. \ No newline at end of file diff --git a/images/codingfleet-banner-2.png b/images/codingfleet-banner-2.png new file mode 100644 index 00000000..e95c4d27 Binary files /dev/null and b/images/codingfleet-banner-2.png differ diff --git a/images/codingfleet-banner-3.png b/images/codingfleet-banner-3.png new file mode 100644 index 00000000..9f27495e Binary files /dev/null and b/images/codingfleet-banner-3.png differ diff --git a/images/iproyal-1.png b/images/iproyal-1.png new file mode 100644 index 00000000..9e607e13 Binary files /dev/null and b/images/iproyal-1.png differ diff --git a/python-for-multimedia/compress-image/README.md b/python-for-multimedia/compress-image/README.md index 32f51450..919414cc 100644 --- a/python-for-multimedia/compress-image/README.md +++ b/python-for-multimedia/compress-image/README.md @@ -1,4 +1,56 @@ -# [How to Compress Images in Python](https://www.thepythoncode.com/article/compress-images-in-python) -To run this: -- `pip3 install -r requirements.txt` -- `python compress_image.py --help` \ No newline at end of file +# Compress Image + +Advanced Image Compressor with Batch Processing + +This script provides advanced image compression and resizing features using Python and Pillow. + +## Features + +- Batch processing of multiple images or directories +- Lossy and lossless compression (PNG/WebP) +- Optional JPEG conversion +- Resize by ratio or explicit dimensions +- Preserve or strip metadata (EXIF) +- Custom output directory +- Progress bar using `tqdm` +- Detailed logging + +## Requirements + +- Python 3.6+ +- [Pillow](https://pypi.org/project/Pillow/) +- [tqdm](https://pypi.org/project/tqdm/) + +Install dependencies: + +```bash +pip install pillow tqdm +``` + +## Usage + +```bash +python compress_image.py [options] [ ...] +``` + +## Options +- `-o`, `--output-dir`: Output directory (default: same as input) +- `-q`, `--quality`: Compression quality (0-100, default: 85) +- `-r`, `--resize-ratio`: Resize ratio (0-1, default: 1.0) +- `-w`, `--width`: Output width (requires `--height`) +- `-hh`, `--height`: Output height (requires `--width`) +- `-j`, `--to-jpg`: Convert output to JPEG +- `-m`, `--no-metadata`: Strip metadata (default: preserve) +- `-l`, `--lossless`: Use lossless compression (PNG/WEBP) + +## Examples + +```bash +python compress_image.py image.jpg -r 0.5 -q 80 -j +python compress_image.py images/ -o output/ -m +python compress_image.py image.png -l +``` + +## License + +MIT License. diff --git a/python-for-multimedia/compress-image/compress_image.py b/python-for-multimedia/compress-image/compress_image.py index 6560b887..f1696aa0 100644 --- a/python-for-multimedia/compress-image/compress_image.py +++ b/python-for-multimedia/compress-image/compress_image.py @@ -1,88 +1,104 @@ import os from PIL import Image +import argparse +import logging +from tqdm import tqdm +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) def get_size_format(b, factor=1024, suffix="B"): - """ - Scale bytes to its proper byte format - e.g: - 1253656 => '1.20MB' - 1253656678 => '1.17GB' - """ + """Scale bytes to its proper byte format.""" for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: if b < factor: return f"{b:.2f}{unit}{suffix}" b /= factor return f"{b:.2f}Y{suffix}" - - -def compress_img(image_name, new_size_ratio=0.9, quality=90, width=None, height=None, to_jpg=True): - # load the image to memory - img = Image.open(image_name) - # print the original image shape - print("[*] Image shape:", img.size) - # get the original image size in bytes - image_size = os.path.getsize(image_name) - # print the size before compression/resizing - print("[*] Size before compression:", get_size_format(image_size)) - if new_size_ratio < 1.0: - # if resizing ratio is below 1.0, then multiply width & height with this ratio to reduce image size - img = img.resize((int(img.size[0] * new_size_ratio), int(img.size[1] * new_size_ratio)), Image.LANCZOS) - # print new image shape - print("[+] New Image shape:", img.size) - elif width and height: - # if width and height are set, resize with them instead - img = img.resize((width, height), Image.LANCZOS) - # print new image shape - print("[+] New Image shape:", img.size) - # split the filename and extension - filename, ext = os.path.splitext(image_name) - # make new filename appending _compressed to the original file name - if to_jpg: - # change the extension to JPEG - new_filename = f"{filename}_compressed.jpg" - else: - # retain the same extension of the original image - new_filename = f"{filename}_compressed{ext}" +def compress_image( + input_path, + output_dir=None, + quality=85, + resize_ratio=1.0, + width=None, + height=None, + to_jpg=False, + preserve_metadata=True, + lossless=False, +): + """Compress an image with advanced options.""" try: - # save the image with the corresponding quality and optimize set to True - img.save(new_filename, quality=quality, optimize=True) - except OSError: - # convert the image to RGB mode first - img = img.convert("RGB") - # save the image with the corresponding quality and optimize set to True - img.save(new_filename, quality=quality, optimize=True) - print("[+] New file saved:", new_filename) - # get the new image size in bytes - new_image_size = os.path.getsize(new_filename) - # print the new size in a good format - print("[+] Size after compression:", get_size_format(new_image_size)) - # calculate the saving bytes - saving_diff = new_image_size - image_size - # print the saving percentage - print(f"[+] Image size change: {saving_diff/image_size*100:.2f}% of the original image size.") - - + img = Image.open(input_path) + logger.info(f"[*] Processing: {os.path.basename(input_path)}") + logger.info(f"[*] Original size: {get_size_format(os.path.getsize(input_path))}") + + # Resize if needed + if resize_ratio < 1.0: + new_size = (int(img.size[0] * resize_ratio), int(img.size[1] * resize_ratio)) + img = img.resize(new_size, Image.LANCZOS) + logger.info(f"[+] Resized to: {new_size}") + elif width and height: + img = img.resize((width, height), Image.LANCZOS) + logger.info(f"[+] Resized to: {width}x{height}") + + # Prepare output path + filename, ext = os.path.splitext(os.path.basename(input_path)) + output_ext = ".jpg" if to_jpg else ext + output_filename = f"{filename}_compressed{output_ext}" + output_path = os.path.join(output_dir or os.path.dirname(input_path), output_filename) + + # Save with options + save_kwargs = {"quality": quality, "optimize": True} + if not preserve_metadata: + save_kwargs["exif"] = b"" # Strip metadata + if lossless and ext.lower() in (".png", ".webp"): + save_kwargs["lossless"] = True + + try: + img.save(output_path, **save_kwargs) + except OSError: + img = img.convert("RGB") + img.save(output_path, **save_kwargs) + + logger.info(f"[+] Saved to: {output_path}") + logger.info(f"[+] New size: {get_size_format(os.path.getsize(output_path))}") + except Exception as e: + logger.error(f"[!] Error processing {input_path}: {e}") + +def batch_compress( + input_paths, + output_dir=None, + quality=85, + resize_ratio=1.0, + width=None, + height=None, + to_jpg=False, + preserve_metadata=True, + lossless=False, +): + """Compress multiple images.""" + if output_dir and not os.path.exists(output_dir): + os.makedirs(output_dir, exist_ok=True) + for path in tqdm(input_paths, desc="Compressing images"): + compress_image(path, output_dir, quality, resize_ratio, width, height, to_jpg, preserve_metadata, lossless) + if __name__ == "__main__": - import argparse - parser = argparse.ArgumentParser(description="Simple Python script for compressing and resizing images") - parser.add_argument("image", help="Target image to compress and/or resize") - parser.add_argument("-j", "--to-jpg", action="https://github.com/store_true", help="Whether to convert the image to the JPEG format") - parser.add_argument("-q", "--quality", type=int, help="Quality ranging from a minimum of 0 (worst) to a maximum of 95 (best). Default is 90", default=90) - parser.add_argument("-r", "--resize-ratio", type=float, help="Resizing ratio from 0 to 1, setting to 0.5 will multiply width & height of the image by 0.5. Default is 1.0", default=1.0) - parser.add_argument("-w", "--width", type=int, help="The new width image, make sure to set it with the `height` parameter") - parser.add_argument("-hh", "--height", type=int, help="The new height for the image, make sure to set it with the `width` parameter") + parser = argparse.ArgumentParser(description="Advanced Image Compressor with Batch Processing") + parser.add_argument("input", nargs='+', help="Input image(s) or directory") + parser.add_argument("-o", "--output-dir", help="Output directory (default: same as input)") + parser.add_argument("-q", "--quality", type=int, default=85, help="Compression quality (0-100)") + parser.add_argument("-r", "--resize-ratio", type=float, default=1.0, help="Resize ratio (0-1)") + parser.add_argument("-w", "--width", type=int, help="Output width (requires --height)") + parser.add_argument("-hh", "--height", type=int, help="Output height (requires --width)") + parser.add_argument("-j", "--to-jpg", action="https://github.com/store_true", help="Convert output to JPEG") + parser.add_argument("-m", "--no-metadata", action="https://github.com/store_false", help="Strip metadata") + parser.add_argument("-l", "--lossless", action="https://github.com/store_true", help="Use lossless compression (PNG/WEBP)") + args = parser.parse_args() - # print the passed arguments - print("="*50) - print("[*] Image:", args.image) - print("[*] To JPEG:", args.to_jpg) - print("[*] Quality:", args.quality) - print("[*] Resizing ratio:", args.resize_ratio) - if args.width and args.height: - print("[*] Width:", args.width) - print("[*] Height:", args.height) - print("="*50) - # compress the image - compress_img(args.image, args.resize_ratio, args.quality, args.width, args.height, args.to_jpg) \ No newline at end of file + input_paths = [] + for path in args.input: + if os.path.isdir(path): input_paths.extend(os.path.join(path, f) for f in os.listdir(path) if f.lower().endswith((".jpg",".jpeg",".png",".webp"))) + else: input_paths.append(path) + if not input_paths: logger.error("No valid images found!"); exit(1) + batch_compress(input_paths, args.output_dir, args.quality, args.resize_ratio, args.width, args.height, args.to_jpg, args.no_metadata, args.lossless) diff --git a/python-for-multimedia/recover-deleted-files/README.md b/python-for-multimedia/recover-deleted-files/README.md new file mode 100644 index 00000000..9b57b100 --- /dev/null +++ b/python-for-multimedia/recover-deleted-files/README.md @@ -0,0 +1 @@ +# [How to Recover Deleted Files with Python](https://thepythoncode.com/article/how-to-recover-deleted-file-with-python) \ No newline at end of file diff --git a/python-for-multimedia/recover-deleted-files/file_recovery.py b/python-for-multimedia/recover-deleted-files/file_recovery.py new file mode 100644 index 00000000..057995c4 --- /dev/null +++ b/python-for-multimedia/recover-deleted-files/file_recovery.py @@ -0,0 +1,552 @@ + +import os +import sys +import argparse +import struct +import time +import logging +import subprocess +import signal +from datetime import datetime, timedelta +from pathlib import Path +import binascii + +# File signatures (magic numbers) for common file types +FILE_SIGNATURES = { + 'jpg': [bytes([0xFF, 0xD8, 0xFF, 0xE0]), bytes([0xFF, 0xD8, 0xFF, 0xE1])], + 'png': [bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A])], + 'gif': [bytes([0x47, 0x49, 0x46, 0x38, 0x37, 0x61]), bytes([0x47, 0x49, 0x46, 0x38, 0x39, 0x61])], + 'pdf': [bytes([0x25, 0x50, 0x44, 0x46])], + 'zip': [bytes([0x50, 0x4B, 0x03, 0x04])], + 'docx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])], # More specific signature + 'xlsx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])], # More specific signature + 'pptx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])], # More specific signature + 'mp3': [bytes([0x49, 0x44, 0x33])], + 'mp4': [bytes([0x00, 0x00, 0x00, 0x18, 0x66, 0x74, 0x79, 0x70])], + 'avi': [bytes([0x52, 0x49, 0x46, 0x46])], +} + +# Additional validation patterns to check after finding the signature +# This helps reduce false positives +VALIDATION_PATTERNS = { + 'docx': [b'word/', b'[Content_Types].xml'], + 'xlsx': [b'xl/', b'[Content_Types].xml'], + 'pptx': [b'ppt/', b'[Content_Types].xml'], + 'zip': [b'PK\x01\x02'], # Central directory header + 'pdf': [b'obj', b'endobj'], +} + +# File endings (trailer signatures) for some file types +FILE_TRAILERS = { + 'jpg': bytes([0xFF, 0xD9]), + 'png': bytes([0x49, 0x45, 0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82]), + 'gif': bytes([0x00, 0x3B]), + 'pdf': bytes([0x25, 0x25, 0x45, 0x4F, 0x46]), +} + +# Maximum file sizes to prevent recovering corrupted files +MAX_FILE_SIZES = { + 'jpg': 30 * 1024 * 1024, # 30MB + 'png': 50 * 1024 * 1024, # 50MB + 'gif': 20 * 1024 * 1024, # 20MB + 'pdf': 100 * 1024 * 1024, # 100MB + 'zip': 200 * 1024 * 1024, # 200MB + 'docx': 50 * 1024 * 1024, # 50MB + 'xlsx': 50 * 1024 * 1024, # 50MB + 'pptx': 100 * 1024 * 1024, # 100MB + 'mp3': 50 * 1024 * 1024, # 50MB + 'mp4': 1024 * 1024 * 1024, # 1GB + 'avi': 1024 * 1024 * 1024, # 1GB +} + +class FileRecoveryTool: + def __init__(self, source, output_dir, file_types=None, deep_scan=False, + block_size=512, log_level=logging.INFO, skip_existing=True, + max_scan_size=None, timeout_minutes=None): + """ + Initialize the file recovery tool + + Args: + source (str): Path to the source device or directory + output_dir (str): Directory to save recovered files + file_types (list): List of file types to recover + deep_scan (bool): Whether to perform a deep scan + block_size (int): Block size for reading data + log_level (int): Logging level + skip_existing (bool): Skip existing files in output directory + max_scan_size (int): Maximum number of bytes to scan + timeout_minutes (int): Timeout in minutes + """ + self.source = source + self.output_dir = Path(output_dir) + self.file_types = file_types if file_types else list(FILE_SIGNATURES.keys()) + self.deep_scan = deep_scan + self.block_size = block_size + self.skip_existing = skip_existing + self.max_scan_size = max_scan_size + self.timeout_minutes = timeout_minutes + self.timeout_reached = False + + # Setup logging + self.setup_logging(log_level) + + # Create output directory if it doesn't exist + self.output_dir.mkdir(parents=True, exist_ok=True) + + # Statistics + self.stats = { + 'total_files_recovered': 0, + 'recovered_by_type': {}, + 'start_time': time.time(), + 'bytes_scanned': 0, + 'false_positives': 0 + } + + for file_type in self.file_types: + self.stats['recovered_by_type'][file_type] = 0 + + def setup_logging(self, log_level): + """Set up logging configuration""" + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + logging.FileHandler(f"recovery_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") + ] + ) + self.logger = logging.getLogger('file_recovery') + + def _setup_timeout(self): + """Set up a timeout handler""" + if self.timeout_minutes: + def timeout_handler(signum, frame): + self.logger.warning(f"Timeout of {self.timeout_minutes} minutes reached!") + self.timeout_reached = True + + # Set the timeout + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(int(self.timeout_minutes * 60)) + + def get_device_size(self): + """Get the size of the device or file""" + if os.path.isfile(self.source): + # Regular file + return os.path.getsize(self.source) + else: + # Block device + try: + # Try using blockdev command (Linux) + result = subprocess.run(['blockdev', '--getsize64', self.source], + capture_output=True, text=True, check=True) + return int(result.stdout.strip()) + except (subprocess.SubprocessError, FileNotFoundError): + try: + # Try using ioctl (requires root) + import fcntl + with open(self.source, 'rb') as fd: + # BLKGETSIZE64 = 0x80081272 + buf = bytearray(8) + fcntl.ioctl(fd, 0x80081272, buf) + return struct.unpack('L', buf)[0] + except: + # Last resort: try to seek to the end + try: + with open(self.source, 'rb') as fd: + fd.seek(0, 2) # Seek to end + return fd.tell() + except: + self.logger.warning("Could not determine device size. Using fallback size.") + # Fallback to a reasonable size for testing + return 1024 * 1024 * 1024 # 1GB + + def scan_device(self): + """Scan the device for deleted files""" + self.logger.info(f"Starting scan of {self.source}") + self.logger.info(f"Looking for file types: {', '.join(self.file_types)}") + + try: + # Get device size + device_size = self.get_device_size() + self.logger.info(f"Device size: {self._format_size(device_size)}") + + # Set up timeout if specified + if self.timeout_minutes: + self._setup_timeout() + self.logger.info(f"Timeout set for {self.timeout_minutes} minutes") + + with open(self.source, 'rb', buffering=0) as device: # buffering=0 for direct I/O + self._scan_device_data(device, device_size) + + except (IOError, OSError) as e: + self.logger.error(f"Error accessing source: {e}") + return False + + self._print_summary() + return True + + def _scan_device_data(self, device, device_size): + """Scan the device data for file signatures""" + position = 0 + + # Limit scan size if specified + if self.max_scan_size and self.max_scan_size < device_size: + self.logger.info(f"Limiting scan to first {self._format_size(self.max_scan_size)} of device") + device_size = self.max_scan_size + + # Create subdirectories for each file type + for file_type in self.file_types: + (self.output_dir / file_type).mkdir(exist_ok=True) + + scan_start_time = time.time() + last_progress_time = scan_start_time + + # Read the device in blocks + while position < device_size: + # Check if timeout reached + if self.timeout_reached: + self.logger.warning("Stopping scan due to timeout") + break + + try: + # Seek to position first + device.seek(position) + + # Read a block of data + data = device.read(self.block_size) + if not data: + break + + self.stats['bytes_scanned'] += len(data) + + # Check for file signatures in this block + for file_type in self.file_types: + signatures = FILE_SIGNATURES.get(file_type, []) + + for signature in signatures: + sig_pos = data.find(signature) + + if sig_pos != -1: + # Found a file signature, try to recover the file + absolute_pos = position + sig_pos + device.seek(absolute_pos) + + self.logger.debug(f"Found {file_type} signature at position {absolute_pos}") + + # Recover the file + if self._recover_file(device, file_type, absolute_pos): + self.stats['total_files_recovered'] += 1 + self.stats['recovered_by_type'][file_type] += 1 + else: + self.stats['false_positives'] += 1 + + # Reset position to continue scanning + device.seek(position + self.block_size) + + # Update position and show progress + position += self.block_size + current_time = time.time() + + # Show progress every 5MB or 10 seconds, whichever comes first + if (position % (5 * 1024 * 1024) == 0) or (current_time - last_progress_time >= 10): + percent = (position / device_size) * 100 if device_size > 0 else 0 + elapsed = current_time - self.stats['start_time'] + + # Calculate estimated time remaining + if position > 0 and device_size > 0: + bytes_per_second = position / elapsed if elapsed > 0 else 0 + remaining_bytes = device_size - position + eta_seconds = remaining_bytes / bytes_per_second if bytes_per_second > 0 else 0 + eta_str = str(timedelta(seconds=int(eta_seconds))) + else: + eta_str = "unknown" + + self.logger.info(f"Progress: {percent:.2f}% ({self._format_size(position)} / {self._format_size(device_size)}) - " + f"{self.stats['total_files_recovered']} files recovered - " + f"Elapsed: {timedelta(seconds=int(elapsed))} - ETA: {eta_str}") + last_progress_time = current_time + + except Exception as e: + self.logger.error(f"Error reading at position {position}: {e}") + position += self.block_size # Skip this block and continue + + def _validate_file_content(self, data, file_type): + """ + Additional validation to reduce false positives + + Args: + data: File data to validate + file_type: Type of file to validate + + Returns: + bool: True if file content appears valid + """ + # Check minimum size + if len(data) < 100: + return False + + # Check for validation patterns + patterns = VALIDATION_PATTERNS.get(file_type, []) + if patterns: + for pattern in patterns: + if pattern in data: + return True + return False # None of the patterns were found + + # For file types without specific validation patterns + return True + + def _recover_file(self, device, file_type, start_position): + """ + Recover a file of the given type starting at the given position + + Args: + device: Open file handle to the device + file_type: Type of file to recover + start_position: Starting position of the file + + Returns: + bool: True if file was recovered successfully + """ + max_size = MAX_FILE_SIZES.get(file_type, 10 * 1024 * 1024) # Default to 10MB + trailer = FILE_TRAILERS.get(file_type) + + # Generate a unique filename + filename = f"{file_type}_{start_position}_{int(time.time())}_{binascii.hexlify(os.urandom(4)).decode()}.{file_type}" + output_path = self.output_dir / file_type / filename + + if self.skip_existing and output_path.exists(): + self.logger.debug(f"Skipping existing file: {output_path}") + return False + + # Save the current position to restore later + current_pos = device.tell() + + try: + # Seek to the start of the file + device.seek(start_position) + + # Read the file data + if trailer and self.deep_scan: + # If we know the trailer and deep scan is enabled, read until trailer + file_data = self._read_until_trailer(device, trailer, max_size) + else: + # Otherwise, use heuristics to determine file size + file_data = self._read_file_heuristic(device, file_type, max_size) + + if not file_data or len(file_data) < 100: # Ignore very small files + return False + + # Additional validation to reduce false positives + if not self._validate_file_content(file_data, file_type): + self.logger.debug(f"Skipping invalid {file_type} file at position {start_position}") + return False + + # Write the recovered file + with open(output_path, 'wb') as f: + f.write(file_data) + + self.logger.info(f"Recovered {file_type} file: {filename} ({self._format_size(len(file_data))})") + return True + + except Exception as e: + self.logger.error(f"Error recovering file at position {start_position}: {e}") + return False + finally: + # Restore the original position + try: + device.seek(current_pos) + except: + pass # Ignore seek errors in finally block + + def _read_until_trailer(self, device, trailer, max_size): + """Read data until a trailer signature is found or max size is reached""" + buffer = bytearray() + chunk_size = 4096 + + while len(buffer) < max_size: + try: + chunk = device.read(chunk_size) + if not chunk: + break + + buffer.extend(chunk) + + # Check if trailer is in the buffer + trailer_pos = buffer.find(trailer, max(0, len(buffer) - len(trailer) - chunk_size)) + if trailer_pos != -1: + # Found trailer, return data up to and including the trailer + return buffer[:trailer_pos + len(trailer)] + except Exception as e: + self.logger.error(f"Error reading chunk: {e}") + break + + # If we reached max size without finding a trailer, return what we have + return buffer if len(buffer) > 100 else None + + def _read_file_heuristic(self, device, file_type, max_size): + """ + Use heuristics to determine file size when trailer is unknown + This is a simplified approach - real tools use more sophisticated methods + """ + buffer = bytearray() + chunk_size = 4096 + valid_chunks = 0 + invalid_chunks = 0 + + # For Office documents and ZIP files, read a larger initial chunk to validate + initial_chunk_size = 16384 if file_type in ['docx', 'xlsx', 'pptx', 'zip'] else chunk_size + + # Read initial chunk for validation + initial_chunk = device.read(initial_chunk_size) + if not initial_chunk: + return None + + buffer.extend(initial_chunk) + + # For Office documents, check if it contains required elements + if file_type in ['docx', 'xlsx', 'pptx', 'zip']: + # Basic validation for Office Open XML files + if file_type == 'docx' and b'word/' not in initial_chunk: + return None + if file_type == 'xlsx' and b'xl/' not in initial_chunk: + return None + if file_type == 'pptx' and b'ppt/' not in initial_chunk: + return None + if file_type == 'zip' and b'PK\x01\x02' not in initial_chunk: + return None + + # Continue reading chunks + while len(buffer) < max_size: + try: + chunk = device.read(chunk_size) + if not chunk: + break + + buffer.extend(chunk) + + # Simple heuristic: for binary files, check if chunk contains too many non-printable characters + # This is a very basic approach and would need to be refined for real-world use + if file_type in ['jpg', 'png', 'gif', 'pdf', 'zip', 'docx', 'xlsx', 'pptx', 'mp3', 'mp4', 'avi']: + # For binary files, we continue reading until we hit max size or end of device + valid_chunks += 1 + + # For ZIP-based formats, check for corruption + if file_type in ['zip', 'docx', 'xlsx', 'pptx'] and b'PK' not in chunk and valid_chunks > 10: + # If we've read several chunks and don't see any more PK signatures, we might be past the file + invalid_chunks += 1 + + else: + # For text files, we could check for text validity + printable_ratio = sum(32 <= b <= 126 or b in (9, 10, 13) for b in chunk) / len(chunk) + if printable_ratio < 0.7: # If less than 70% printable characters + invalid_chunks += 1 + else: + valid_chunks += 1 + + # If we have too many invalid chunks in a row, stop + if invalid_chunks > 3: + return buffer[:len(buffer) - (invalid_chunks * chunk_size)] + except Exception as e: + self.logger.error(f"Error reading chunk in heuristic: {e}") + break + + return buffer + + def _format_size(self, size_bytes): + """Format size in bytes to a human-readable string""" + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if size_bytes < 1024 or unit == 'TB': + return f"{size_bytes:.2f} {unit}" + size_bytes /= 1024 + + def _print_summary(self): + """Print a summary of the recovery operation""" + elapsed = time.time() - self.stats['start_time'] + + self.logger.info("=" * 50) + self.logger.info("Recovery Summary") + self.logger.info("=" * 50) + self.logger.info(f"Total files recovered: {self.stats['total_files_recovered']}") + self.logger.info(f"False positives detected and skipped: {self.stats['false_positives']}") + self.logger.info(f"Total data scanned: {self._format_size(self.stats['bytes_scanned'])}") + self.logger.info(f"Time elapsed: {timedelta(seconds=int(elapsed))}") + self.logger.info("Files recovered by type:") + + for file_type, count in self.stats['recovered_by_type'].items(): + if count > 0: + self.logger.info(f" - {file_type}: {count}") + + if self.timeout_reached: + self.logger.info("Note: Scan was stopped due to timeout") + + self.logger.info("=" * 50) + + +def main(): + """Main function to parse arguments and run the recovery tool""" + parser = argparse.ArgumentParser(description='File Recovery Tool - Recover deleted files from storage devices') + + parser.add_argument('source', help='Source device or directory to recover files from (e.g., /dev/sdb, /media/usb)') + parser.add_argument('output', help='Directory to save recovered files') + + parser.add_argument('-t', '--types', nargs='+', choices=FILE_SIGNATURES.keys(), default=None, + help='File types to recover (default: all supported types)') + + parser.add_argument('-d', '--deep-scan', action='https://github.com/store_true', + help='Perform a deep scan (slower but more thorough)') + + parser.add_argument('-b', '--block-size', type=int, default=512, + help='Block size for reading data (default: 512 bytes)') + + parser.add_argument('-v', '--verbose', action='https://github.com/store_true', + help='Enable verbose output') + + parser.add_argument('-q', '--quiet', action='https://github.com/store_true', + help='Suppress all output except errors') + + parser.add_argument('--no-skip', action='https://github.com/store_true', + help='Do not skip existing files in output directory') + + parser.add_argument('--max-size', type=int, + help='Maximum size to scan in MB (e.g., 1024 for 1GB)') + + parser.add_argument('--timeout', type=int, default=None, + help='Stop scanning after specified minutes') + + args = parser.parse_args() + + # Set logging level based on verbosity + if args.quiet: + log_level = logging.ERROR + elif args.verbose: + log_level = logging.DEBUG + else: + log_level = logging.INFO + + # Convert max size from MB to bytes if specified + max_scan_size = args.max_size * 1024 * 1024 if args.max_size else None + + # Create and run the recovery tool + recovery_tool = FileRecoveryTool( + source=args.source, + output_dir=args.output, + file_types=args.types, + deep_scan=args.deep_scan, + block_size=args.block_size, + log_level=log_level, + skip_existing=not args.no_skip, + max_scan_size=max_scan_size, + timeout_minutes=args.timeout + ) + + try: + recovery_tool.scan_device() + except KeyboardInterrupt: + print("\nRecovery process interrupted by user.") + recovery_tool._print_summary() + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/web-scraping/youtube-extractor/extract_video_info.py b/web-scraping/youtube-extractor/extract_video_info.py index 042ce4f8..bed184b0 100644 --- a/web-scraping/youtube-extractor/extract_video_info.py +++ b/web-scraping/youtube-extractor/extract_video_info.py @@ -1,92 +1,150 @@ -from requests_html import HTMLSession -from bs4 import BeautifulSoup as bs +import requests +from bs4 import BeautifulSoup import re import json - -# init session -session = HTMLSession() - +import argparse def get_video_info(url): - # download HTML code - response = session.get(url) - # execute Javascript - response.html.render(timeout=60) - # create beautiful soup object to parse HTML - soup = bs(response.html.html, "html.parser") - # open("index.html", "w").write(response.html.html) - # initialize the result - result = {} - # video title - result["title"] = soup.find("meta", itemprop="name")['content'] - # video views - result["views"] = soup.find("meta", itemprop="interactionCount")['content'] - # video description - result["description"] = soup.find("meta", itemprop="description")['content'] - # date published - result["date_published"] = soup.find("meta", itemprop="datePublished")['content'] - # get the duration of the video - result["duration"] = soup.find("span", {"class": "ytp-time-duration"}).text - # get the video tags - result["tags"] = ', '.join([ meta.attrs.get("content") for meta in soup.find_all("meta", {"property": "og:video:tag"}) ]) - - # Additional video and channel information (with help from: https://stackoverflow.com/a/68262735) - data = re.search(r"var ytInitialData = ({.*?});", soup.prettify()).group(1) - data_json = json.loads(data) - videoPrimaryInfoRenderer = data_json['contents']['twoColumnWatchNextResults']['results']['results']['contents'][0]['videoPrimaryInfoRenderer'] - videoSecondaryInfoRenderer = data_json['contents']['twoColumnWatchNextResults']['results']['results']['contents'][1]['videoSecondaryInfoRenderer'] - # number of likes - likes_label = videoPrimaryInfoRenderer['videoActions']['menuRenderer']['topLevelButtons'][0]['toggleButtonRenderer']['defaultText']['accessibility']['accessibilityData']['label'] # "No likes" or "###,### likes" - likes_str = likes_label.split(' ')[0].replace(',','') - result["likes"] = '0' if likes_str == 'No' else likes_str - # number of likes (old way) doesn't always work - # text_yt_formatted_strings = soup.find_all("yt-formatted-string", {"id": "text", "class": "ytd-toggle-button-renderer"}) - # result["likes"] = ''.join([ c for c in text_yt_formatted_strings[0].attrs.get("aria-label") if c.isdigit() ]) - # result["likes"] = 0 if result['likes'] == '' else int(result['likes']) - # number of dislikes - YouTube does not publish this anymore... - # result["dislikes"] = ''.join([ c for c in text_yt_formatted_strings[1].attrs.get("aria-label") if c.isdigit() ]) - # result["dislikes"] = '0' if result['dislikes'] == '' else result['dislikes'] - result['dislikes'] = 'UNKNOWN' - # channel details - channel_tag = soup.find("meta", itemprop="channelId")['content'] - # channel name - channel_name = soup.find("span", itemprop="author").next.next['content'] - # channel URL - # channel_url = soup.find("span", itemprop="author").next['href'] - channel_url = f"https://www.youtube.com/%7Bchannel_tag%7D" - # number of subscribers as str - channel_subscribers = videoSecondaryInfoRenderer['owner']['videoOwnerRenderer']['subscriberCountText']['accessibility']['accessibilityData']['label'] - # channel details (old way) - # channel_tag = soup.find("yt-formatted-string", {"class": "ytd-channel-name"}).find("a") - # # channel name (old way) - # channel_name = channel_tag.text - # # channel URL (old way) - # channel_url = f"https://www.youtube.com{channel_tag['href']}" - # number of subscribers as str (old way) - # channel_subscribers = soup.find("yt-formatted-string", {"id": "owner-sub-count"}).text.strip() - result['channel'] = {'name': channel_name, 'url': channel_url, 'subscribers': channel_subscribers} - return result + """ + Extract video information from YouTube using modern approach + """ + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + + try: + # Download HTML code + response = requests.get(url, headers=headers) + response.raise_for_status() + + # Create beautiful soup object to parse HTML + soup = BeautifulSoup(response.text, "html.parser") + + # Initialize the result + result = {} + + # Extract ytInitialData which contains all the video information + data_match = re.search(r'var ytInitialData = ({.*?});', response.text) + if not data_match: + raise Exception("Could not find ytInitialData in page") + + data_json = json.loads(data_match.group(1)) + + # Get the main content sections + contents = data_json['contents']['twoColumnWatchNextResults']['results']['results']['contents'] + + # Extract video information from videoPrimaryInfoRenderer + if 'videoPrimaryInfoRenderer' in contents[0]: + primary = contents[0]['videoPrimaryInfoRenderer'] + + # Video title + result["title"] = primary['title']['runs'][0]['text'] + + # Video views + result["views"] = primary['viewCount']['videoViewCountRenderer']['viewCount']['simpleText'] + + # Date published + result["date_published"] = primary['dateText']['simpleText'] + + # Extract channel information from videoSecondaryInfoRenderer + secondary = None + if 'videoSecondaryInfoRenderer' in contents[1]: + secondary = contents[1]['videoSecondaryInfoRenderer'] + owner = secondary['owner']['videoOwnerRenderer'] + + # Channel name + channel_name = owner['title']['runs'][0]['text'] + + # Channel ID + channel_id = owner['navigationEndpoint']['browseEndpoint']['browseId'] + + # Channel URL - FIXED with proper /channel/ path + channel_url = f"https://www.youtube.com/channel/%7Bchannel_id%7D" + + # Number of subscribers + channel_subscribers = owner['subscriberCountText']['accessibility']['accessibilityData']['label'] + + result['channel'] = { + 'name': channel_name, + 'url': channel_url, + 'subscribers': channel_subscribers + } + + # Extract video description + if secondary and 'attributedDescription' in secondary: + description_runs = secondary['attributedDescription']['content'] + result["description"] = description_runs + else: + result["description"] = "Description not available" + + # Try to extract video duration from player overlay + # This is a fallback approach since the original method doesn't work + duration_match = re.search(r'"approxDurationMs":"(\d+)"', response.text) + if duration_match: + duration_ms = int(duration_match.group(1)) + minutes = duration_ms // 60000 + seconds = (duration_ms % 60000) // 1000 + result["duration"] = f"{minutes}:{seconds:02d}" + else: + result["duration"] = "Duration not available" + + # Extract video tags if available + video_tags = [] + if 'keywords' in data_json.get('metadata', {}).get('videoMetadataRenderer', {}): + video_tags = data_json['metadata']['videoMetadataRenderer']['keywords'] + result["tags"] = ', '.join(video_tags) if video_tags else "No tags available" + + # Extract likes (modern approach) + result["likes"] = "Likes count not available" + result["dislikes"] = "UNKNOWN" # YouTube no longer shows dislikes + + # Try to find likes in the new structure + for content in contents: + if 'compositeVideoPrimaryInfoRenderer' in content: + composite = content['compositeVideoPrimaryInfoRenderer'] + if 'likeButton' in composite: + like_button = composite['likeButton'] + if 'toggleButtonRenderer' in like_button: + toggle = like_button['toggleButtonRenderer'] + if 'defaultText' in toggle: + default_text = toggle['defaultText'] + if 'accessibility' in default_text: + accessibility = default_text['accessibility'] + if 'accessibilityData' in accessibility: + label = accessibility['accessibilityData']['label'] + if 'like' in label.lower(): + result["likes"] = label + + return result + + except Exception as e: + raise Exception(f"Error extracting video info: {str(e)}") if __name__ == "__main__": - import argparse parser = argparse.ArgumentParser(description="YouTube Video Data Extractor") parser.add_argument("url", help="URL of the YouTube video") args = parser.parse_args() + # parse the video URL from command line url = args.url - data = get_video_info(url) + try: + data = get_video_info(url) - # print in nice format - print(f"Title: {data['title']}") - print(f"Views: {data['views']}") - print(f"Published at: {data['date_published']}") - print(f"Video Duration: {data['duration']}") - print(f"Video tags: {data['tags']}") - print(f"Likes: {data['likes']}") - print(f"Dislikes: {data['dislikes']}") - print(f"\nDescription: {data['description']}\n") - print(f"\nChannel Name: {data['channel']['name']}") - print(f"Channel URL: {data['channel']['url']}") - print(f"Channel Subscribers: {data['channel']['subscribers']}") + # print in nice format + print(f"Title: {data['title']}") + print(f"Views: {data['views']}") + print(f"Published at: {data['date_published']}") + print(f"Video Duration: {data['duration']}") + print(f"Video tags: {data['tags']}") + print(f"Likes: {data['likes']}") + print(f"Dislikes: {data['dislikes']}") + print(f"\nDescription: {data['description']}\n") + print(f"\nChannel Name: {data['channel']['name']}") + print(f"Channel URL: {data['channel']['url']}") + print(f"Channel Subscribers: {data['channel']['subscribers']}") + + except Exception as e: + print(f"Error: {e}") + print("\nNote: YouTube frequently changes its structure, so this script may need updates.") \ No newline at end of file diff --git a/web-scraping/youtube-transcript-summarizer/README.md b/web-scraping/youtube-transcript-summarizer/README.md new file mode 100644 index 00000000..a3df25a0 --- /dev/null +++ b/web-scraping/youtube-transcript-summarizer/README.md @@ -0,0 +1 @@ +# [YouTube Video Transcription Summarization with Python](https://thepythoncode.com/article/youtube-video-transcription-and-summarization-with-python) \ No newline at end of file diff --git a/web-scraping/youtube-transcript-summarizer/requirements.txt b/web-scraping/youtube-transcript-summarizer/requirements.txt new file mode 100644 index 00000000..865ee3b5 --- /dev/null +++ b/web-scraping/youtube-transcript-summarizer/requirements.txt @@ -0,0 +1,5 @@ +nltk +pytube +youtube_transcript_api +colorama +openai diff --git a/web-scraping/youtube-transcript-summarizer/youtube_transcript_summarizer.py b/web-scraping/youtube-transcript-summarizer/youtube_transcript_summarizer.py new file mode 100644 index 00000000..bdb80f54 --- /dev/null +++ b/web-scraping/youtube-transcript-summarizer/youtube_transcript_summarizer.py @@ -0,0 +1,319 @@ +import os +import sys +import nltk +import pytube +from youtube_transcript_api import YouTubeTranscriptApi +from nltk.corpus import stopwords +from nltk.tokenize import sent_tokenize, word_tokenize +from nltk.probability import FreqDist +from heapq import nlargest +from urllib.parse import urlparse, parse_qs +import textwrap +from colorama import Fore, Back, Style, init +from openai import OpenAI + +# Initialize colorama for cross-platform colored terminal output +init(autoreset=True) + +# Download necessary NLTK data +nltk.download('punkt_tab', quiet=True) +nltk.download('punkt', quiet=True) +nltk.download('stopwords', quiet=True) + +# Initialize OpenAI client from environment variable +# Expect the OpenRouter API key to be provided via OPENROUTER_API_KEY +api_key = os.getenv("OPENROUTER_API_KEY") +if not api_key: + print(Fore.RED + "Error: OPENROUTER_API_KEY environment variable is not set or is still the placeholder ('').") + sys.exit(1) +else: + client = OpenAI( + base_url="https://openrouter.ai/api/v1", + api_key=api_key, + ) + +def extract_video_id(youtube_url): + """Extract the video ID from a YouTube URL.""" + parsed_url = urlparse(youtube_url) + + if parsed_url.netloc == 'youtu.be': + return parsed_url.path[1:] + + if parsed_url.netloc in ('www.youtube.com', 'youtube.com'): + if parsed_url.path == '/watch': + return parse_qs(parsed_url.query)['v'][0] + elif parsed_url.path.startswith('/embed/'): + return parsed_url.path.split('/')[2] + elif parsed_url.path.startswith('/v/'): + return parsed_url.path.split('/')[2] + + # If no match found + raise ValueError(f"Could not extract video ID from URL: {youtube_url}") + +def get_transcript(video_id): + """Get the transcript of a YouTube video.""" + try: + youtube_transcript_api = YouTubeTranscriptApi() + fetched_transcript = youtube_transcript_api.fetch(video_id) + full_transcript = " ".join([snippet.text for snippet in fetched_transcript.snippets]) + return full_transcript.strip() + except Exception as e: + return f"Error retrieving transcript: {str(e)}." + +def summarize_text_nltk(text, num_sentences=5): + """Summarize text using frequency-based extractive summarization with NLTK.""" + if not text or text.startswith("Error") or text.startswith("Transcript not available"): + return text + + # Tokenize the text into sentences and words + sentences = sent_tokenize(text) + + # If there are fewer sentences than requested, return all sentences + if len(sentences) <= num_sentences: + return text + + # Tokenize words and remove stopwords + stop_words = set(stopwords.words('english')) + words = word_tokenize(text.lower()) + words = [word for word in words if word.isalnum() and word not in stop_words] + + # Calculate word frequencies + freq = FreqDist(words) + + # Score sentences based on word frequencies + sentence_scores = {} + for i, sentence in enumerate(sentences): + for word in word_tokenize(sentence.lower()): + if word in freq: + if i in sentence_scores: + sentence_scores[i] += freq[word] + else: + sentence_scores[i] = freq[word] + + # Get the top N sentences with highest scores + summary_sentences_indices = nlargest(num_sentences, sentence_scores, key=sentence_scores.get) + summary_sentences_indices.sort() # Sort to maintain original order + + # Construct the summary + summary = ' '.join([sentences[i] for i in summary_sentences_indices]) + return summary + +def summarize_text_ai(text, video_title, num_sentences=5): + """Summarize text using the Mistral AI model via OpenRouter.""" + if not text or text.startswith("Error") or text.startswith("Transcript not available"): + return text + + # Truncate text if it's too long (models often have token limits) + max_chars = 15000 # Adjust based on model's context window + truncated_text = text[:max_chars] if len(text) > max_chars else text + + prompt = f"""Please provide a concise summary of the following YouTube video transcript. +Title: {video_title} + +Transcript: +{truncated_text} + +Create a clear, informative summary that captures the main points and key insights from the video. +Your summary should be approximately {num_sentences} sentences long. +""" + + try: + completion = client.chat.completions.create( + model="mistralai/mistral-small-3.1-24b-instruct:free", + messages=[ + { + "role": "user", + "content": [ + { + "type": "text", + "text": prompt + } + ] + } + ] + ) + return completion.choices[0].message.content + except Exception as e: + return f"Error generating AI summary: {str(e)}" + +def summarize_youtube_video(youtube_url, num_sentences=5): + """Main function to summarize a YouTube video's transcription.""" + try: + video_id = extract_video_id(youtube_url) + transcript = get_transcript(video_id) + + # Get video title for context + try: + yt = pytube.YouTube(youtube_url) + video_title = yt.title + + except Exception as e: + video_title = "Unknown Title" + + + # Generate both summaries + print(Fore.YELLOW + f"Generating AI summary with {num_sentences} sentences...") + ai_summary = summarize_text_ai(transcript, video_title, num_sentences) + + print(Fore.YELLOW + f"Generating NLTK summary with {num_sentences} sentences...") + nltk_summary = summarize_text_nltk(transcript, num_sentences) + + return { + "video_title": video_title, + "video_id": video_id, + "ai_summary": ai_summary, + "nltk_summary": nltk_summary, + "full_transcript_length": len(transcript.split()), + "nltk_summary_length": len(nltk_summary.split()), + "ai_summary_length": len(ai_summary.split()) if not ai_summary.startswith("Error") else 0 + } + except Exception as e: + return {"error": str(e)} + +def format_time(seconds): + """Convert seconds to a readable time format.""" + hours, remainder = divmod(seconds, 3600) + minutes, seconds = divmod(remainder, 60) + + if hours > 0: + return f"{hours}h {minutes}m {seconds}s" + elif minutes > 0: + return f"{minutes}m {seconds}s" + else: + return f"{seconds}s" + +def format_number(number): + """Format large numbers with commas for readability.""" + return "{:,}".format(number) + +def print_boxed_text(text, width=80, title=None, color=Fore.WHITE): + """Print text in a nice box with optional title.""" + wrapper = textwrap.TextWrapper(width=width-4) # -4 for the box margins + wrapped_text = wrapper.fill(text) + lines = wrapped_text.split('\n') + + # Print top border with optional title + if title: + title_space = width - 4 - len(title) + left_padding = title_space // 2 + right_padding = title_space - left_padding + print(color + '┌' + '─' * left_padding + title + '─' * right_padding + '┐') + else: + print(color + '┌' + '─' * (width-2) + '┐') + + # Print content + for line in lines: + padding = width - 2 - len(line) + print(color + '│ ' + line + ' ' * padding + '│') + + # Print bottom border + print(color + '└' + '─' * (width-2) + '┘') + +def print_summary_result(result, width=80): + """Print the summary result in a nicely formatted way.""" + if "error" in result: + print_boxed_text(f"Error: {result['error']}", width=width, title="ERROR", color=Fore.RED) + return + + # Terminal width + terminal_width = width + + # Print header with video information + print("\n" + Fore.CYAN + "=" * terminal_width) + print(Fore.CYAN + Style.BRIGHT + result['video_title'].center(terminal_width)) + print(Fore.CYAN + "=" * terminal_width + "\n") + + # Video metadata section + print(Fore.YELLOW + Style.BRIGHT + "VIDEO INFORMATION".center(terminal_width)) + print(Fore.YELLOW + "─" * terminal_width) + + # Two-column layout for metadata + col_width = terminal_width // 2 - 2 + + # Row 3 + print(f"{Fore.GREEN}Video ID: {Fore.WHITE}{result['video_id']:<{col_width}}" + f"{Fore.GREEN}URL: {Fore.WHITE}https://youtu.be/{result['video_id']}") + + print(Fore.YELLOW + "─" * terminal_width + "\n") + + # AI Summary section + ai_compression = "N/A" + if result['ai_summary_length'] > 0: + ai_compression = round((1 - result['ai_summary_length'] / result['full_transcript_length']) * 100) + + ai_summary_title = f" AI SUMMARY ({result['ai_summary_length']} words, condensed {ai_compression}% from {result['full_transcript_length']} words) " + + print(Fore.GREEN + Style.BRIGHT + ai_summary_title.center(terminal_width)) + print(Fore.GREEN + "─" * terminal_width) + + # Print the AI summary with proper wrapping + wrapper = textwrap.TextWrapper(width=terminal_width-4, + initial_indent=' ', + subsequent_indent=' ') + + # Split AI summary into paragraphs and print each + ai_paragraphs = result['ai_summary'].split('\n') + for paragraph in ai_paragraphs: + if paragraph.strip(): # Skip empty paragraphs + print(wrapper.fill(paragraph)) + print() # Empty line between paragraphs + + print(Fore.GREEN + "─" * terminal_width + "\n") + + # NLTK Summary section + nltk_compression = round((1 - result['nltk_summary_length'] / result['full_transcript_length']) * 100) + nltk_summary_title = f" NLTK SUMMARY ({result['nltk_summary_length']} words, condensed {nltk_compression}% from {result['full_transcript_length']} words) " + + print(Fore.MAGENTA + Style.BRIGHT + nltk_summary_title.center(terminal_width)) + print(Fore.MAGENTA + "─" * terminal_width) + + # Split NLTK summary into paragraphs and wrap each + paragraphs = result['nltk_summary'].split('. ') + formatted_paragraphs = [] + + current_paragraph = "" + for sentence in paragraphs: + if not sentence.endswith('.'): + sentence += '.' + + if len(current_paragraph) + len(sentence) + 1 <= 150: # Arbitrary length for paragraph + current_paragraph += " " + sentence if current_paragraph else sentence + else: + if current_paragraph: + formatted_paragraphs.append(current_paragraph) + current_paragraph = sentence + + if current_paragraph: + formatted_paragraphs.append(current_paragraph) + + # Print each paragraph + for paragraph in formatted_paragraphs: + print(wrapper.fill(paragraph)) + print() # Empty line between paragraphs + + print(Fore.MAGENTA + "─" * terminal_width + "\n") + + +if __name__ == "__main__": + # Get terminal width + try: + terminal_width = os.get_terminal_size().columns + # Limit width to reasonable range + terminal_width = max(80, min(terminal_width, 120)) + except: + terminal_width = 80 # Default if can't determine + + # Print welcome banner + print(Fore.CYAN + Style.BRIGHT + "\n" + "=" * terminal_width) + print(Fore.CYAN + Style.BRIGHT + "YOUTUBE VIDEO SUMMARIZER".center(terminal_width)) + print(Fore.CYAN + Style.BRIGHT + "=" * terminal_width + "\n") + + youtube_url = input(Fore.GREEN + "Enter YouTube video URL: " + Fore.WHITE) + + num_sentences_input = input(Fore.GREEN + "Enter number of sentences for summaries (default 5): " + Fore.WHITE) + num_sentences = int(num_sentences_input) if num_sentences_input.strip() else 5 + + print(Fore.YELLOW + "\nFetching and analyzing video transcript... Please wait...\n") + + result = summarize_youtube_video(youtube_url, num_sentences) + print_summary_result(result, width=terminal_width)