diff --git a/README.md b/README.md
index 81442a70..ddf5f25d 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,13 @@
-
+
+
+
+
+
+
+
+
+
+
# Python Code Tutorials
This is a repository of all the tutorials of [The Python Code](https://www.thepythoncode.com) website.
## List of Tutorials
@@ -178,6 +187,7 @@ This is a repository of all the tutorials of [The Python Code](https://www.thepy
- [How to Query the Ethereum Blockchain with Python](https://www.thepythoncode.com/article/query-ethereum-blockchain-with-python). ([code](general/query-ethereum))
- [Data Cleaning with Pandas in Python](https://www.thepythoncode.com/article/data-cleaning-using-pandas-in-python). ([code](general/data-cleaning-pandas))
- [How to Minify CSS with Python](https://www.thepythoncode.com/article/minimize-css-files-in-python). ([code](general/minify-css))
+ - [Build a real MCP client and server in Python with FastMCP (Todo Manager example)](https://www.thepythoncode.com/article/fastmcp-mcp-client-server-todo-manager). ([code](general/fastmcp-mcp-client-server-todo-manager))
@@ -200,6 +210,7 @@ This is a repository of all the tutorials of [The Python Code](https://www.thepy
- [How to Extract Google Trends Data in Python](https://www.thepythoncode.com/article/extract-google-trends-data-in-python). ([code](web-scraping/extract-google-trends-data))
- [How to Make a YouTube Video Downloader in Python](https://www.thepythoncode.com/article/make-a-youtube-video-downloader-in-python). ([code](web-scraping/youtube-video-downloader))
- [How to Build a YouTube Audio Downloader in Python](https://www.thepythoncode.com/article/build-a-youtube-mp3-downloader-tkinter-python). ([code](web-scraping/youtube-mp3-downloader))
+ - [YouTube Video Transcription Summarization with Python](https://thepythoncode.com/article/youtube-video-transcription-and-summarization-with-python). ([code](web-scraping/youtube-transcript-summarizer/))
- ### [Python Standard Library](https://www.thepythoncode.com/topic/python-standard-library)
- [How to Transfer Files in the Network using Sockets in Python](https://www.thepythoncode.com/article/send-receive-files-using-sockets-python). ([code](general/transfer-files/))
@@ -285,6 +296,7 @@ This is a repository of all the tutorials of [The Python Code](https://www.thepy
- [How to Compress Images in Python](https://www.thepythoncode.com/article/compress-images-in-python). ([code](python-for-multimedia/compress-image))
- [How to Remove Metadata from an Image in Python](https://thepythoncode.com/article/how-to-clear-image-metadata-in-python). ([code](python-for-multimedia/remove-metadata-from-images))
- [How to Create Videos from Images in Python](https://thepythoncode.com/article/create-a-video-from-images-opencv-python). ([code](python-for-multimedia/create-video-from-images))
+ - [How to Recover Deleted Files with Python](https://thepythoncode.com/article/how-to-recover-deleted-file-with-python). ([code](python-for-multimedia/recover-deleted-files))
- ### [Web Programming](https://www.thepythoncode.com/topic/web-programming)
- [Detecting Fraudulent Transactions in a Streaming Application using Kafka in Python](https://www.thepythoncode.com/article/detect-fraudulent-transactions-with-apache-kafka-in-python). ([code](general/detect-fraudulent-transactions))
diff --git a/ethical-hacking/get-wifi-passwords/README.md b/ethical-hacking/get-wifi-passwords/README.md
index e24eda7f..a10efc10 100644
--- a/ethical-hacking/get-wifi-passwords/README.md
+++ b/ethical-hacking/get-wifi-passwords/README.md
@@ -1 +1,3 @@
-# [How to Extract Saved WiFi Passwords in Python](https://www.thepythoncode.com/article/extract-saved-wifi-passwords-in-python)
\ No newline at end of file
+# [How to Extract Saved WiFi Passwords in Python](https://www.thepythoncode.com/article/extract-saved-wifi-passwords-in-python)
+
+This program lists saved Wi-Fi networks and their passwords on Windows and Linux machines. In addition to the SSID (Wi-Fi network name) and passwords, the output also shows the network’s security type and ciphers.
\ No newline at end of file
diff --git a/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py b/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py
index 0afd70ca..ff32f6f8 100644
--- a/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py
+++ b/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py
@@ -28,10 +28,16 @@ def get_windows_saved_wifi_passwords(verbose=1):
[list]: list of extracted profiles, a profile has the fields ["ssid", "ciphers", "key"]
"""
ssids = get_windows_saved_ssids()
- Profile = namedtuple("Profile", ["ssid", "ciphers", "key"])
+ Profile = namedtuple("Profile", ["ssid", "security", "ciphers", "key"])
profiles = []
for ssid in ssids:
ssid_details = subprocess.check_output(f"""netsh wlan show profile "{ssid}" key=clear""").decode()
+
+ #get the security type
+ security = re.findall(r"Authentication\s(.*)", ssid_details)
+ # clear spaces and colon
+ security = "/".join(dict.fromkeys(c.strip().strip(":").strip() for c in security))
+
# get the ciphers
ciphers = re.findall(r"Cipher\s(.*)", ssid_details)
# clear spaces and colon
@@ -43,7 +49,7 @@ def get_windows_saved_wifi_passwords(verbose=1):
key = key[0].strip().strip(":").strip()
except IndexError:
key = "None"
- profile = Profile(ssid=ssid, ciphers=ciphers, key=key)
+ profile = Profile(ssid=ssid, security=security, ciphers=ciphers, key=key)
if verbose >= 1:
print_windows_profile(profile)
profiles.append(profile)
@@ -52,12 +58,13 @@ def get_windows_saved_wifi_passwords(verbose=1):
def print_windows_profile(profile):
"""Prints a single profile on Windows"""
- print(f"{profile.ssid:25}{profile.ciphers:15}{profile.key:50}")
+ #print(f"{profile.ssid:25}{profile.ciphers:15}{profile.key:50}")
+ print(f"{profile.ssid:25}{profile.security:30}{profile.ciphers:35}{profile.key:50}")
def print_windows_profiles(verbose):
"""Prints all extracted SSIDs along with Key on Windows"""
- print("SSID CIPHER(S) KEY")
+ print("SSID Securities CIPHER(S) KEY")
print("-"*50)
get_windows_saved_wifi_passwords(verbose)
diff --git a/ethical-hacking/http-security-headers/README.md b/ethical-hacking/http-security-headers/README.md
new file mode 100644
index 00000000..e0e7b1d0
--- /dev/null
+++ b/ethical-hacking/http-security-headers/README.md
@@ -0,0 +1,2 @@
+Grab your API key from Open Router:- https://openrouter.ai/
+Model is Used is DeepSeek: DeepSeek V3.1 (free). However, feel free to try others.
\ No newline at end of file
diff --git a/ethical-hacking/http-security-headers/http_security_headers.py b/ethical-hacking/http-security-headers/http_security_headers.py
new file mode 100644
index 00000000..67b494c4
--- /dev/null
+++ b/ethical-hacking/http-security-headers/http_security_headers.py
@@ -0,0 +1,149 @@
+#!/usr/bin/env python3
+import requests
+import json
+import os
+import argparse
+from typing import Dict, List, Tuple
+from openai import OpenAI
+
+class SecurityHeadersAnalyzer:
+ def __init__(self, api_key: str = None, base_url: str = None, model: str = None):
+ self.api_key = api_key or os.getenv('OPENROUTER_API_KEY') or os.getenv('OPENAI_API_KEY')
+ self.base_url = base_url or os.getenv('OPENROUTER_BASE_URL', 'https://openrouter.ai/api/v1')
+ self.model = model or os.getenv('LLM_MODEL', 'deepseek/deepseek-chat-v3.1:free')
+
+ if not self.api_key:
+ raise ValueError("API key is required. Set OPENROUTER_API_KEY or provide --api-key")
+
+ self.client = OpenAI(base_url=self.base_url, api_key=self.api_key)
+
+ def fetch_headers(self, url: str, timeout: int = 10) -> Tuple[Dict[str, str], int]:
+ """Fetch HTTP headers from URL"""
+ if not url.startswith(('http://', 'https://')):
+ url = 'https://' + url
+
+ try:
+ response = requests.get(url, timeout=timeout, allow_redirects=True)
+ return dict(response.headers), response.status_code
+ except requests.exceptions.RequestException as e:
+ print(f"Error fetching {url}: {e}")
+ return {}, 0
+
+ def analyze_headers(self, url: str, headers: Dict[str, str], status_code: int) -> str:
+ """Analyze headers using LLM"""
+ prompt = f"""Analyze the HTTP security headers for {url} (Status: {status_code})
+
+Headers:
+{json.dumps(headers, indent=2)}
+
+Provide a comprehensive security analysis including:
+1. Security score (0-100) and overall assessment
+2. Critical security issues that need immediate attention
+3. Missing important security headers
+4. Analysis of existing security headers and their effectiveness
+5. Specific recommendations for improvement
+6. Potential security risks based on current configuration
+
+Focus on practical, actionable advice following current web security best practices. Please do not include ** and #
+in the response except for specific references where necessary. use numbers, romans, alphabets instead Format the response well please. """
+
+ try:
+ completion = self.client.chat.completions.create(
+ model=self.model,
+ messages=[{"role": "user", "content": prompt}],
+ temperature=0.2
+ )
+ return completion.choices[0].message.content
+ except Exception as e:
+ return f"Analysis failed: {e}"
+
+ def analyze_url(https://github.com/self,%20url:%20str,%20timeout:%20int%20=%2010) -> Dict:
+ """Analyze a single URL"""
+ print(f"\nAnalyzing: {url}")
+ print("-" * 50)
+
+ headers, status_code = self.fetch_headers(url, timeout)
+ if not headers:
+ return {"url": url, "error": "Failed to fetch headers"}
+
+ print(f"Status Code: {status_code}")
+ print(f"\nHTTP Headers ({len(headers)} found):")
+ print("-" * 30)
+ for key, value in headers.items():
+ print(f"{key}: {value}")
+
+ print(f"\nAnalyzing with AI...")
+ analysis = self.analyze_headers(url, headers, status_code)
+
+ print("\nSECURITY ANALYSIS")
+ print("=" * 50)
+ print(analysis)
+
+ return {
+ "url": url,
+ "status_code": status_code,
+ "headers_count": len(headers),
+ "analysis": analysis,
+ "raw_headers": headers
+ }
+
+ def analyze_multiple_urls(self, urls: List[str], timeout: int = 10) -> List[Dict]:
+ """Analyze multiple URLs"""
+ results = []
+ for i, url in enumerate(urls, 1):
+ print(f"\n[{i}/{len(urls)}]")
+ result = self.analyze_url(https://github.com/url,%20timeout)
+ results.append(result)
+ return results
+
+ def export_results(self, results: List[Dict], filename: str):
+ """Export results to JSON"""
+ with open(filename, 'w') as f:
+ json.dump(results, f, indent=2, ensure_ascii=False)
+ print(f"\nResults exported to: {filename}")
+
+def main():
+ parser = argparse.ArgumentParser(
+ description='Analyze HTTP security headers using AI',
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog='''Examples:
+ python security_headers.py https://example.com
+ python security_headers.py example.com google.com
+ python security_headers.py example.com --export results.json
+
+Environment Variables:
+ OPENROUTER_API_KEY - API key for OpenRouter
+ OPENAI_API_KEY - API key for OpenAI
+ LLM_MODEL - Model to use (default: deepseek/deepseek-chat-v3.1:free)'''
+ )
+
+ parser.add_argument('urls', nargs='+', help='URLs to analyze')
+ parser.add_argument('--api-key', help='API key for LLM service')
+ parser.add_argument('--base-url', help='Base URL for LLM API')
+ parser.add_argument('--model', help='LLM model to use')
+ parser.add_argument('--timeout', type=int, default=10, help='Request timeout (default: 10s)')
+ parser.add_argument('--export', help='Export results to JSON file')
+
+ args = parser.parse_args()
+
+ try:
+ analyzer = SecurityHeadersAnalyzer(
+ api_key=args.api_key,
+ base_url=args.base_url,
+ model=args.model
+ )
+
+ results = analyzer.analyze_multiple_urls(args.urls, args.timeout)
+
+ if args.export:
+ analyzer.export_results(results, args.export)
+
+ except ValueError as e:
+ print(f"Error: {e}")
+ return 1
+ except KeyboardInterrupt:
+ print("\nAnalysis interrupted by user")
+ return 1
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/ethical-hacking/http-security-headers/requirements.txt b/ethical-hacking/http-security-headers/requirements.txt
new file mode 100644
index 00000000..f0dd0aec
--- /dev/null
+++ b/ethical-hacking/http-security-headers/requirements.txt
@@ -0,0 +1 @@
+openai
\ No newline at end of file
diff --git a/general/fastmcp-mcp-client-server-todo-manager/README.md b/general/fastmcp-mcp-client-server-todo-manager/README.md
new file mode 100644
index 00000000..dd988428
--- /dev/null
+++ b/general/fastmcp-mcp-client-server-todo-manager/README.md
@@ -0,0 +1,39 @@
+# Build a real MCP client and server in Python with FastMCP (Todo Manager example)
+
+This folder contains the code that accompanies the article:
+
+- Article: https://www.thepythoncode.com/article/fastmcp-mcp-client-server-todo-manager
+
+What’s included
+- `todo_server.py`: FastMCP MCP server exposing tools, resources, and a prompt for a Todo Manager.
+- `todo_client_test.py`: A small client script that connects to the server and exercises all features.
+- `requirements.txt`: Python dependencies for this tutorial.
+
+Quick start
+1) Install requirements
+```bash
+python -m venv .venv && source .venv/bin/activate # or use your preferred env manager
+pip install -r requirements.txt
+```
+
+2) Run the server (stdio transport by default)
+```bash
+python todo_server.py
+```
+
+3) In a separate terminal, run the client
+```bash
+python todo_client_test.py
+```
+
+Optional: run the server over HTTP
+- In `todo_server.py`, replace the last line with:
+```python
+mcp.run(transport="http", host="127.0.0.1", port=8000)
+```
+- Then change the client constructor to `Client("http://127.0.0.1:8000/mcp")`.
+
+Notes
+- Requires Python 3.10+.
+- The example uses in-memory storage for simplicity.
+- For production tips (HTTPS, auth, containerization), see the article.
diff --git a/general/fastmcp-mcp-client-server-todo-manager/requirements.txt b/general/fastmcp-mcp-client-server-todo-manager/requirements.txt
new file mode 100644
index 00000000..2c9387f7
--- /dev/null
+++ b/general/fastmcp-mcp-client-server-todo-manager/requirements.txt
@@ -0,0 +1 @@
+fastmcp>=2.12
\ No newline at end of file
diff --git a/general/fastmcp-mcp-client-server-todo-manager/todo_client_test.py b/general/fastmcp-mcp-client-server-todo-manager/todo_client_test.py
new file mode 100644
index 00000000..f01a1e78
--- /dev/null
+++ b/general/fastmcp-mcp-client-server-todo-manager/todo_client_test.py
@@ -0,0 +1,50 @@
+import asyncio
+from fastmcp import Client
+
+async def main():
+ # Option A: Connect to local Python script (stdio)
+ client = Client("todo_server.py")
+
+ # Option B: In-memory (for tests)
+ # from todo_server import mcp
+ # client = Client(mcp)
+
+ async with client:
+ await client.ping()
+ print("[OK] Connected")
+
+ # Create a few todos
+ t1 = await client.call_tool("create_todo", {"title": "Write README", "priority": "high"})
+ t2 = await client.call_tool("create_todo", {"title": "Refactor utils", "description": "Split helpers into modules"})
+ t3 = await client.call_tool("create_todo", {"title": "Add tests", "priority": "low"})
+ print("Created IDs:", t1.data["id"], t2.data["id"], t3.data["id"])
+
+ # List open
+ open_list = await client.call_tool("list_todos", {"status": "open"})
+ print("Open IDs:", [t["id"] for t in open_list.data["items"]])
+
+ # Complete one
+ updated = await client.call_tool("complete_todo", {"todo_id": t2.data["id"]})
+ print("Completed:", updated.data["id"], "status:", updated.data["status"])
+
+ # Search
+ found = await client.call_tool("search_todos", {"query": "readme"})
+ print("Search 'readme':", [t["id"] for t in found.data["items"]])
+
+ # Resources
+ stats = await client.read_resource("stats://todos")
+ print("Stats:", getattr(stats[0], "text", None) or stats[0])
+
+ todo2 = await client.read_resource(f"todo://{t2.data['id']}")
+ print("todo://{id}:", getattr(todo2[0], "text", None) or todo2[0])
+
+ # Prompt
+ prompt_msgs = await client.get_prompt("suggest_next_action", {"pending": 2, "project": "MCP tutorial"})
+ msgs_pretty = [
+ {"role": m.role, "content": getattr(m, "content", None) or getattr(m, "text", None)}
+ for m in getattr(prompt_msgs, "messages", [])
+ ]
+ print("Prompt messages:", msgs_pretty)
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/general/fastmcp-mcp-client-server-todo-manager/todo_server.py b/general/fastmcp-mcp-client-server-todo-manager/todo_server.py
new file mode 100644
index 00000000..64f99b73
--- /dev/null
+++ b/general/fastmcp-mcp-client-server-todo-manager/todo_server.py
@@ -0,0 +1,88 @@
+from typing import Literal
+from itertools import count
+from datetime import datetime, timezone
+from fastmcp import FastMCP
+
+# In-memory storage for demo purposes
+TODOS: list[dict] = []
+_id = count(start=1)
+
+mcp = FastMCP(name="Todo Manager")
+
+@mcp.tool
+def create_todo(
+ title: str,
+ description: str = "",
+ priority: Literal["low", "medium", "high"] = "medium",
+) -> dict:
+ """Create a todo (id, title, status, priority, timestamps)."""
+ todo = {
+ "id": next(_id),
+ "title": title,
+ "description": description,
+ "priority": priority,
+ "status": "open",
+ "created_at": datetime.now(timezone.utc).isoformat(),
+ "completed_at": None,
+ }
+ TODOS.append(todo)
+ return todo
+
+@mcp.tool
+def list_todos(status: Literal["open", "done", "all"] = "open") -> dict:
+ """List todos by status ('open' | 'done' | 'all')."""
+ if status == "all":
+ items = TODOS
+ elif status == "open":
+ items = [t for t in TODOS if t["status"] == "open"]
+ else:
+ items = [t for t in TODOS if t["status"] == "done"]
+ return {"items": items}
+
+@mcp.tool
+def complete_todo(todo_id: int) -> dict:
+ """Mark a todo as done."""
+ for t in TODOS:
+ if t["id"] == todo_id:
+ t["status"] = "done"
+ t["completed_at"] = datetime.now(timezone.utc).isoformat()
+ return t
+ raise ValueError(f"Todo {todo_id} not found")
+
+@mcp.tool
+def search_todos(query: str) -> dict:
+ """Case-insensitive search in title/description."""
+ q = query.lower().strip()
+ items = [t for t in TODOS if q in t["title"].lower() or q in t["description"].lower()]
+ return {"items": items}
+
+# Read-only resources
+@mcp.resource("stats://todos")
+def todo_stats() -> dict:
+ """Aggregated stats: total, open, done."""
+ total = len(TODOS)
+ open_count = sum(1 for t in TODOS if t["status"] == "open")
+ done_count = total - open_count
+ return {"total": total, "open": open_count, "done": done_count}
+
+@mcp.resource("todo://{id}")
+def get_todo(id: int) -> dict:
+ """Fetch a single todo by id."""
+ for t in TODOS:
+ if t["id"] == id:
+ return t
+ raise ValueError(f"Todo {id} not found")
+
+# A reusable prompt
+@mcp.prompt
+def suggest_next_action(pending: int, project: str | None = None) -> str:
+ """Render a small instruction for an LLM to propose next action."""
+ base = f"You have {pending} pending TODOs. "
+ if project:
+ base += f"They relate to the project '{project}'. "
+ base += "Suggest the most impactful next action in one short sentence."
+ return base
+
+if __name__ == "__main__":
+ # Default transport is stdio; you can also use transport="http", host=..., port=...
+ mcp.run()
diff --git a/general/interactive-weather-plot/interactive_weather_plot.py b/general/interactive-weather-plot/interactive_weather_plot.py
index b4d17141..3d1ea566 100644
--- a/general/interactive-weather-plot/interactive_weather_plot.py
+++ b/general/interactive-weather-plot/interactive_weather_plot.py
@@ -68,7 +68,7 @@ def changeLocation(newLocation):
# Making the Radio Buttons
buttons = RadioButtons(
ax=plt.axes([0.1, 0.1, 0.2, 0.2]),
- labels=locations.keys()
+ labels=list(locations.keys())
)
# Connect click event on the buttons to the function that changes location.
@@ -86,4 +86,4 @@ def changeLocation(newLocation):
plt.savefig('file.svg', format='svg')
-plt.show()
\ No newline at end of file
+plt.show()
diff --git a/gui-programming/rich-text-editor/rich_text_editor.py b/gui-programming/rich-text-editor/rich_text_editor.py
index 10c14263..05259905 100644
--- a/gui-programming/rich-text-editor/rich_text_editor.py
+++ b/gui-programming/rich-text-editor/rich_text_editor.py
@@ -112,9 +112,9 @@ def fileManager(event=None, action=None):
document['tags'][tagName] = []
ranges = textArea.tag_ranges(tagName)
-
- for i, tagRange in enumerate(ranges[::2]):
- document['tags'][tagName].append([str(tagRange), str(ranges[i+1])])
+
+ for i in range(0, len(ranges), 2):
+ document['tags'][tagName].append([str(ranges[i]), str(ranges[i + 1])])
if not filePath:
# ask the user for a filename with the native file explorer.
diff --git a/handling-pdf-files/pdf-compressor/README.md b/handling-pdf-files/pdf-compressor/README.md
index 4527174c..307f105c 100644
--- a/handling-pdf-files/pdf-compressor/README.md
+++ b/handling-pdf-files/pdf-compressor/README.md
@@ -1,8 +1,48 @@
# [How to Compress PDF Files in Python](https://www.thepythoncode.com/article/compress-pdf-files-in-python)
-To run this:
-- `pip3 install -r requirements.txt`
-- To compress `bert-paper.pdf` file:
- ```
- $ python pdf_compressor.py bert-paper.pdf bert-paper-min.pdf
- ```
- This will spawn a new compressed PDF file under the name `bert-paper-min.pdf`.
+
+This directory contains two approaches:
+
+- Legacy (commercial): `pdf_compressor.py` uses PDFTron/PDFNet. PDFNet now requires a license key and the old pip package is not freely available, so this may not work without a license.
+- Recommended (open source): `pdf_compressor_ghostscript.py` uses Ghostscript to compress PDFs.
+
+## Ghostscript method (recommended)
+
+Prerequisite: Install Ghostscript
+
+- macOS (Homebrew):
+ - `brew install ghostscript`
+- Ubuntu/Debian:
+ - `sudo apt-get update && sudo apt-get install -y ghostscript`
+- Windows:
+ - Download and install from https://ghostscript.com/releases/
+ - Ensure `gswin64c.exe` (or `gswin32c.exe`) is in your PATH.
+
+No Python packages are required for this method, only Ghostscript.
+
+### Usage
+
+To compress `bert-paper.pdf` into `bert-paper-min.pdf` with default quality (`power=2`):
+
+```
+python pdf_compressor_ghostscript.py bert-paper.pdf bert-paper-min.pdf
+```
+
+Optional quality level `[power]` controls compression/quality tradeoff (maps to Ghostscript `-dPDFSETTINGS`):
+
+- 0 = `/screen` (smallest, lowest quality)
+- 1 = `/ebook` (good quality)
+- 2 = `/printer` (high quality) [default]
+- 3 = `/prepress` (very high quality)
+- 4 = `/default` (Ghostscript default)
+
+Example:
+
+```
+python pdf_compressor_ghostscript.py bert-paper.pdf bert-paper-min.pdf 1
+```
+
+In testing, `bert-paper.pdf` (~757 KB) compressed to ~407 KB with `power=1`.
+
+## Legacy PDFNet method (requires license)
+
+If you have a valid license and the PDFNet SDK installed, you can use the original `pdf_compressor.py` script. Note that the previously referenced `PDFNetPython3` pip package is not freely available and may not install via pip. Refer to the vendor's documentation for installation and licensing.
\ No newline at end of file
diff --git a/handling-pdf-files/pdf-compressor/pdf_compressor_ghostscript.py b/handling-pdf-files/pdf-compressor/pdf_compressor_ghostscript.py
new file mode 100644
index 00000000..88de4062
--- /dev/null
+++ b/handling-pdf-files/pdf-compressor/pdf_compressor_ghostscript.py
@@ -0,0 +1,103 @@
+import os
+import sys
+import subprocess
+import shutil
+
+
+def get_size_format(b, factor=1024, suffix="B"):
+ for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
+ if b < factor:
+ return f"{b:.2f}{unit}{suffix}"
+ b /= factor
+ return f"{b:.2f}Y{suffix}"
+
+
+def find_ghostscript_executable():
+ candidates = [
+ shutil.which('gs'),
+ shutil.which('gswin64c'),
+ shutil.which('gswin32c'),
+ ]
+ for c in candidates:
+ if c:
+ return c
+ return None
+
+
+def compress_file(input_file: str, output_file: str, power: int = 2):
+ """Compress PDF using Ghostscript.
+
+ power:
+ 0 -> /screen (lowest quality, highest compression)
+ 1 -> /ebook (good quality)
+ 2 -> /printer (high quality) [default]
+ 3 -> /prepress (very high quality)
+ 4 -> /default (Ghostscript default)
+ """
+ if not os.path.exists(input_file):
+ raise FileNotFoundError(f"Input file not found: {input_file}")
+ if not output_file:
+ output_file = input_file
+
+ initial_size = os.path.getsize(input_file)
+
+ gs = find_ghostscript_executable()
+ if not gs:
+ raise RuntimeError(
+ "Ghostscript not found. Install it and ensure 'gs' (Linux/macOS) "
+ "or 'gswin64c'/'gswin32c' (Windows) is in PATH."
+ )
+
+ settings_map = {
+ 0: '/screen',
+ 1: '/ebook',
+ 2: '/printer',
+ 3: '/prepress',
+ 4: '/default',
+ }
+ pdfsettings = settings_map.get(power, '/printer')
+
+ cmd = [
+ gs,
+ '-sDEVICE=pdfwrite',
+ '-dCompatibilityLevel=1.4',
+ f'-dPDFSETTINGS={pdfsettings}',
+ '-dNOPAUSE',
+ '-dBATCH',
+ '-dQUIET',
+ f'-sOutputFile={output_file}',
+ input_file,
+ ]
+
+ try:
+ subprocess.run(cmd, check=True)
+ except subprocess.CalledProcessError as e:
+ print(f"Ghostscript failed: {e}")
+ return False
+
+ compressed_size = os.path.getsize(output_file)
+ ratio = 1 - (compressed_size / initial_size)
+ summary = {
+ "Input File": input_file,
+ "Initial Size": get_size_format(initial_size),
+ "Output File": output_file,
+ "Compressed Size": get_size_format(compressed_size),
+ "Compression Ratio": f"{ratio:.3%}",
+ }
+
+ print("## Summary ########################################################")
+ for k, v in summary.items():
+ print(f"{k}: {v}")
+ print("###################################################################")
+ return True
+
+
+if __name__ == '__main__':
+ if len(sys.argv) < 3:
+ print("Usage: python pdf_compressor_ghostscript.py [power 0-4]")
+ sys.exit(1)
+ input_file = sys.argv[1]
+ output_file = sys.argv[2]
+ power = int(sys.argv[3]) if len(sys.argv) > 3 else 2
+ ok = compress_file(input_file, output_file, power)
+ sys.exit(0 if ok else 2)
\ No newline at end of file
diff --git a/handling-pdf-files/pdf-compressor/requirements.txt b/handling-pdf-files/pdf-compressor/requirements.txt
index 0a664a86..9f6e5337 100644
--- a/handling-pdf-files/pdf-compressor/requirements.txt
+++ b/handling-pdf-files/pdf-compressor/requirements.txt
@@ -1 +1,7 @@
-PDFNetPython3==8.1.0
\ No newline at end of file
+# No Python dependencies required for Ghostscript-based compressor.
+# System dependency: Ghostscript
+# - macOS: brew install ghostscript
+# - Debian: sudo apt-get install -y ghostscript
+# - Windows: https://ghostscript.com/releases/
+#
+# The legacy script (pdf_compressor.py) depends on PDFNet (commercial) and a license key.
\ No newline at end of file
diff --git a/images/codingfleet-banner-2.png b/images/codingfleet-banner-2.png
new file mode 100644
index 00000000..e95c4d27
Binary files /dev/null and b/images/codingfleet-banner-2.png differ
diff --git a/images/codingfleet-banner-3.png b/images/codingfleet-banner-3.png
new file mode 100644
index 00000000..9f27495e
Binary files /dev/null and b/images/codingfleet-banner-3.png differ
diff --git a/images/iproyal-1.png b/images/iproyal-1.png
new file mode 100644
index 00000000..9e607e13
Binary files /dev/null and b/images/iproyal-1.png differ
diff --git a/python-for-multimedia/compress-image/README.md b/python-for-multimedia/compress-image/README.md
index 32f51450..919414cc 100644
--- a/python-for-multimedia/compress-image/README.md
+++ b/python-for-multimedia/compress-image/README.md
@@ -1,4 +1,56 @@
-# [How to Compress Images in Python](https://www.thepythoncode.com/article/compress-images-in-python)
-To run this:
-- `pip3 install -r requirements.txt`
-- `python compress_image.py --help`
\ No newline at end of file
+# Compress Image
+
+Advanced Image Compressor with Batch Processing
+
+This script provides advanced image compression and resizing features using Python and Pillow.
+
+## Features
+
+- Batch processing of multiple images or directories
+- Lossy and lossless compression (PNG/WebP)
+- Optional JPEG conversion
+- Resize by ratio or explicit dimensions
+- Preserve or strip metadata (EXIF)
+- Custom output directory
+- Progress bar using `tqdm`
+- Detailed logging
+
+## Requirements
+
+- Python 3.6+
+- [Pillow](https://pypi.org/project/Pillow/)
+- [tqdm](https://pypi.org/project/tqdm/)
+
+Install dependencies:
+
+```bash
+pip install pillow tqdm
+```
+
+## Usage
+
+```bash
+python compress_image.py [options] [ ...]
+```
+
+## Options
+- `-o`, `--output-dir`: Output directory (default: same as input)
+- `-q`, `--quality`: Compression quality (0-100, default: 85)
+- `-r`, `--resize-ratio`: Resize ratio (0-1, default: 1.0)
+- `-w`, `--width`: Output width (requires `--height`)
+- `-hh`, `--height`: Output height (requires `--width`)
+- `-j`, `--to-jpg`: Convert output to JPEG
+- `-m`, `--no-metadata`: Strip metadata (default: preserve)
+- `-l`, `--lossless`: Use lossless compression (PNG/WEBP)
+
+## Examples
+
+```bash
+python compress_image.py image.jpg -r 0.5 -q 80 -j
+python compress_image.py images/ -o output/ -m
+python compress_image.py image.png -l
+```
+
+## License
+
+MIT License.
diff --git a/python-for-multimedia/compress-image/compress_image.py b/python-for-multimedia/compress-image/compress_image.py
index 6560b887..f1696aa0 100644
--- a/python-for-multimedia/compress-image/compress_image.py
+++ b/python-for-multimedia/compress-image/compress_image.py
@@ -1,88 +1,104 @@
import os
from PIL import Image
+import argparse
+import logging
+from tqdm import tqdm
+# Configure logging
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+logger = logging.getLogger(__name__)
def get_size_format(b, factor=1024, suffix="B"):
- """
- Scale bytes to its proper byte format
- e.g:
- 1253656 => '1.20MB'
- 1253656678 => '1.17GB'
- """
+ """Scale bytes to its proper byte format."""
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if b < factor:
return f"{b:.2f}{unit}{suffix}"
b /= factor
return f"{b:.2f}Y{suffix}"
-
-
-def compress_img(image_name, new_size_ratio=0.9, quality=90, width=None, height=None, to_jpg=True):
- # load the image to memory
- img = Image.open(image_name)
- # print the original image shape
- print("[*] Image shape:", img.size)
- # get the original image size in bytes
- image_size = os.path.getsize(image_name)
- # print the size before compression/resizing
- print("[*] Size before compression:", get_size_format(image_size))
- if new_size_ratio < 1.0:
- # if resizing ratio is below 1.0, then multiply width & height with this ratio to reduce image size
- img = img.resize((int(img.size[0] * new_size_ratio), int(img.size[1] * new_size_ratio)), Image.LANCZOS)
- # print new image shape
- print("[+] New Image shape:", img.size)
- elif width and height:
- # if width and height are set, resize with them instead
- img = img.resize((width, height), Image.LANCZOS)
- # print new image shape
- print("[+] New Image shape:", img.size)
- # split the filename and extension
- filename, ext = os.path.splitext(image_name)
- # make new filename appending _compressed to the original file name
- if to_jpg:
- # change the extension to JPEG
- new_filename = f"{filename}_compressed.jpg"
- else:
- # retain the same extension of the original image
- new_filename = f"{filename}_compressed{ext}"
+def compress_image(
+ input_path,
+ output_dir=None,
+ quality=85,
+ resize_ratio=1.0,
+ width=None,
+ height=None,
+ to_jpg=False,
+ preserve_metadata=True,
+ lossless=False,
+):
+ """Compress an image with advanced options."""
try:
- # save the image with the corresponding quality and optimize set to True
- img.save(new_filename, quality=quality, optimize=True)
- except OSError:
- # convert the image to RGB mode first
- img = img.convert("RGB")
- # save the image with the corresponding quality and optimize set to True
- img.save(new_filename, quality=quality, optimize=True)
- print("[+] New file saved:", new_filename)
- # get the new image size in bytes
- new_image_size = os.path.getsize(new_filename)
- # print the new size in a good format
- print("[+] Size after compression:", get_size_format(new_image_size))
- # calculate the saving bytes
- saving_diff = new_image_size - image_size
- # print the saving percentage
- print(f"[+] Image size change: {saving_diff/image_size*100:.2f}% of the original image size.")
-
-
+ img = Image.open(input_path)
+ logger.info(f"[*] Processing: {os.path.basename(input_path)}")
+ logger.info(f"[*] Original size: {get_size_format(os.path.getsize(input_path))}")
+
+ # Resize if needed
+ if resize_ratio < 1.0:
+ new_size = (int(img.size[0] * resize_ratio), int(img.size[1] * resize_ratio))
+ img = img.resize(new_size, Image.LANCZOS)
+ logger.info(f"[+] Resized to: {new_size}")
+ elif width and height:
+ img = img.resize((width, height), Image.LANCZOS)
+ logger.info(f"[+] Resized to: {width}x{height}")
+
+ # Prepare output path
+ filename, ext = os.path.splitext(os.path.basename(input_path))
+ output_ext = ".jpg" if to_jpg else ext
+ output_filename = f"{filename}_compressed{output_ext}"
+ output_path = os.path.join(output_dir or os.path.dirname(input_path), output_filename)
+
+ # Save with options
+ save_kwargs = {"quality": quality, "optimize": True}
+ if not preserve_metadata:
+ save_kwargs["exif"] = b"" # Strip metadata
+ if lossless and ext.lower() in (".png", ".webp"):
+ save_kwargs["lossless"] = True
+
+ try:
+ img.save(output_path, **save_kwargs)
+ except OSError:
+ img = img.convert("RGB")
+ img.save(output_path, **save_kwargs)
+
+ logger.info(f"[+] Saved to: {output_path}")
+ logger.info(f"[+] New size: {get_size_format(os.path.getsize(output_path))}")
+ except Exception as e:
+ logger.error(f"[!] Error processing {input_path}: {e}")
+
+def batch_compress(
+ input_paths,
+ output_dir=None,
+ quality=85,
+ resize_ratio=1.0,
+ width=None,
+ height=None,
+ to_jpg=False,
+ preserve_metadata=True,
+ lossless=False,
+):
+ """Compress multiple images."""
+ if output_dir and not os.path.exists(output_dir):
+ os.makedirs(output_dir, exist_ok=True)
+ for path in tqdm(input_paths, desc="Compressing images"):
+ compress_image(path, output_dir, quality, resize_ratio, width, height, to_jpg, preserve_metadata, lossless)
+
if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(description="Simple Python script for compressing and resizing images")
- parser.add_argument("image", help="Target image to compress and/or resize")
- parser.add_argument("-j", "--to-jpg", action="https://github.com/store_true", help="Whether to convert the image to the JPEG format")
- parser.add_argument("-q", "--quality", type=int, help="Quality ranging from a minimum of 0 (worst) to a maximum of 95 (best). Default is 90", default=90)
- parser.add_argument("-r", "--resize-ratio", type=float, help="Resizing ratio from 0 to 1, setting to 0.5 will multiply width & height of the image by 0.5. Default is 1.0", default=1.0)
- parser.add_argument("-w", "--width", type=int, help="The new width image, make sure to set it with the `height` parameter")
- parser.add_argument("-hh", "--height", type=int, help="The new height for the image, make sure to set it with the `width` parameter")
+ parser = argparse.ArgumentParser(description="Advanced Image Compressor with Batch Processing")
+ parser.add_argument("input", nargs='+', help="Input image(s) or directory")
+ parser.add_argument("-o", "--output-dir", help="Output directory (default: same as input)")
+ parser.add_argument("-q", "--quality", type=int, default=85, help="Compression quality (0-100)")
+ parser.add_argument("-r", "--resize-ratio", type=float, default=1.0, help="Resize ratio (0-1)")
+ parser.add_argument("-w", "--width", type=int, help="Output width (requires --height)")
+ parser.add_argument("-hh", "--height", type=int, help="Output height (requires --width)")
+ parser.add_argument("-j", "--to-jpg", action="https://github.com/store_true", help="Convert output to JPEG")
+ parser.add_argument("-m", "--no-metadata", action="https://github.com/store_false", help="Strip metadata")
+ parser.add_argument("-l", "--lossless", action="https://github.com/store_true", help="Use lossless compression (PNG/WEBP)")
+
args = parser.parse_args()
- # print the passed arguments
- print("="*50)
- print("[*] Image:", args.image)
- print("[*] To JPEG:", args.to_jpg)
- print("[*] Quality:", args.quality)
- print("[*] Resizing ratio:", args.resize_ratio)
- if args.width and args.height:
- print("[*] Width:", args.width)
- print("[*] Height:", args.height)
- print("="*50)
- # compress the image
- compress_img(args.image, args.resize_ratio, args.quality, args.width, args.height, args.to_jpg)
\ No newline at end of file
+ input_paths = []
+ for path in args.input:
+ if os.path.isdir(path): input_paths.extend(os.path.join(path, f) for f in os.listdir(path) if f.lower().endswith((".jpg",".jpeg",".png",".webp")))
+ else: input_paths.append(path)
+ if not input_paths: logger.error("No valid images found!"); exit(1)
+ batch_compress(input_paths, args.output_dir, args.quality, args.resize_ratio, args.width, args.height, args.to_jpg, args.no_metadata, args.lossless)
diff --git a/python-for-multimedia/recover-deleted-files/README.md b/python-for-multimedia/recover-deleted-files/README.md
new file mode 100644
index 00000000..9b57b100
--- /dev/null
+++ b/python-for-multimedia/recover-deleted-files/README.md
@@ -0,0 +1 @@
+# [How to Recover Deleted Files with Python](https://thepythoncode.com/article/how-to-recover-deleted-file-with-python)
\ No newline at end of file
diff --git a/python-for-multimedia/recover-deleted-files/file_recovery.py b/python-for-multimedia/recover-deleted-files/file_recovery.py
new file mode 100644
index 00000000..057995c4
--- /dev/null
+++ b/python-for-multimedia/recover-deleted-files/file_recovery.py
@@ -0,0 +1,552 @@
+
+import os
+import sys
+import argparse
+import struct
+import time
+import logging
+import subprocess
+import signal
+from datetime import datetime, timedelta
+from pathlib import Path
+import binascii
+
+# File signatures (magic numbers) for common file types
+FILE_SIGNATURES = {
+ 'jpg': [bytes([0xFF, 0xD8, 0xFF, 0xE0]), bytes([0xFF, 0xD8, 0xFF, 0xE1])],
+ 'png': [bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A])],
+ 'gif': [bytes([0x47, 0x49, 0x46, 0x38, 0x37, 0x61]), bytes([0x47, 0x49, 0x46, 0x38, 0x39, 0x61])],
+ 'pdf': [bytes([0x25, 0x50, 0x44, 0x46])],
+ 'zip': [bytes([0x50, 0x4B, 0x03, 0x04])],
+ 'docx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])], # More specific signature
+ 'xlsx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])], # More specific signature
+ 'pptx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])], # More specific signature
+ 'mp3': [bytes([0x49, 0x44, 0x33])],
+ 'mp4': [bytes([0x00, 0x00, 0x00, 0x18, 0x66, 0x74, 0x79, 0x70])],
+ 'avi': [bytes([0x52, 0x49, 0x46, 0x46])],
+}
+
+# Additional validation patterns to check after finding the signature
+# This helps reduce false positives
+VALIDATION_PATTERNS = {
+ 'docx': [b'word/', b'[Content_Types].xml'],
+ 'xlsx': [b'xl/', b'[Content_Types].xml'],
+ 'pptx': [b'ppt/', b'[Content_Types].xml'],
+ 'zip': [b'PK\x01\x02'], # Central directory header
+ 'pdf': [b'obj', b'endobj'],
+}
+
+# File endings (trailer signatures) for some file types
+FILE_TRAILERS = {
+ 'jpg': bytes([0xFF, 0xD9]),
+ 'png': bytes([0x49, 0x45, 0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82]),
+ 'gif': bytes([0x00, 0x3B]),
+ 'pdf': bytes([0x25, 0x25, 0x45, 0x4F, 0x46]),
+}
+
+# Maximum file sizes to prevent recovering corrupted files
+MAX_FILE_SIZES = {
+ 'jpg': 30 * 1024 * 1024, # 30MB
+ 'png': 50 * 1024 * 1024, # 50MB
+ 'gif': 20 * 1024 * 1024, # 20MB
+ 'pdf': 100 * 1024 * 1024, # 100MB
+ 'zip': 200 * 1024 * 1024, # 200MB
+ 'docx': 50 * 1024 * 1024, # 50MB
+ 'xlsx': 50 * 1024 * 1024, # 50MB
+ 'pptx': 100 * 1024 * 1024, # 100MB
+ 'mp3': 50 * 1024 * 1024, # 50MB
+ 'mp4': 1024 * 1024 * 1024, # 1GB
+ 'avi': 1024 * 1024 * 1024, # 1GB
+}
+
+class FileRecoveryTool:
+ def __init__(self, source, output_dir, file_types=None, deep_scan=False,
+ block_size=512, log_level=logging.INFO, skip_existing=True,
+ max_scan_size=None, timeout_minutes=None):
+ """
+ Initialize the file recovery tool
+
+ Args:
+ source (str): Path to the source device or directory
+ output_dir (str): Directory to save recovered files
+ file_types (list): List of file types to recover
+ deep_scan (bool): Whether to perform a deep scan
+ block_size (int): Block size for reading data
+ log_level (int): Logging level
+ skip_existing (bool): Skip existing files in output directory
+ max_scan_size (int): Maximum number of bytes to scan
+ timeout_minutes (int): Timeout in minutes
+ """
+ self.source = source
+ self.output_dir = Path(output_dir)
+ self.file_types = file_types if file_types else list(FILE_SIGNATURES.keys())
+ self.deep_scan = deep_scan
+ self.block_size = block_size
+ self.skip_existing = skip_existing
+ self.max_scan_size = max_scan_size
+ self.timeout_minutes = timeout_minutes
+ self.timeout_reached = False
+
+ # Setup logging
+ self.setup_logging(log_level)
+
+ # Create output directory if it doesn't exist
+ self.output_dir.mkdir(parents=True, exist_ok=True)
+
+ # Statistics
+ self.stats = {
+ 'total_files_recovered': 0,
+ 'recovered_by_type': {},
+ 'start_time': time.time(),
+ 'bytes_scanned': 0,
+ 'false_positives': 0
+ }
+
+ for file_type in self.file_types:
+ self.stats['recovered_by_type'][file_type] = 0
+
+ def setup_logging(self, log_level):
+ """Set up logging configuration"""
+ logging.basicConfig(
+ level=log_level,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ handlers=[
+ logging.StreamHandler(),
+ logging.FileHandler(f"recovery_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
+ ]
+ )
+ self.logger = logging.getLogger('file_recovery')
+
+ def _setup_timeout(self):
+ """Set up a timeout handler"""
+ if self.timeout_minutes:
+ def timeout_handler(signum, frame):
+ self.logger.warning(f"Timeout of {self.timeout_minutes} minutes reached!")
+ self.timeout_reached = True
+
+ # Set the timeout
+ signal.signal(signal.SIGALRM, timeout_handler)
+ signal.alarm(int(self.timeout_minutes * 60))
+
+ def get_device_size(self):
+ """Get the size of the device or file"""
+ if os.path.isfile(self.source):
+ # Regular file
+ return os.path.getsize(self.source)
+ else:
+ # Block device
+ try:
+ # Try using blockdev command (Linux)
+ result = subprocess.run(['blockdev', '--getsize64', self.source],
+ capture_output=True, text=True, check=True)
+ return int(result.stdout.strip())
+ except (subprocess.SubprocessError, FileNotFoundError):
+ try:
+ # Try using ioctl (requires root)
+ import fcntl
+ with open(self.source, 'rb') as fd:
+ # BLKGETSIZE64 = 0x80081272
+ buf = bytearray(8)
+ fcntl.ioctl(fd, 0x80081272, buf)
+ return struct.unpack('L', buf)[0]
+ except:
+ # Last resort: try to seek to the end
+ try:
+ with open(self.source, 'rb') as fd:
+ fd.seek(0, 2) # Seek to end
+ return fd.tell()
+ except:
+ self.logger.warning("Could not determine device size. Using fallback size.")
+ # Fallback to a reasonable size for testing
+ return 1024 * 1024 * 1024 # 1GB
+
+ def scan_device(self):
+ """Scan the device for deleted files"""
+ self.logger.info(f"Starting scan of {self.source}")
+ self.logger.info(f"Looking for file types: {', '.join(self.file_types)}")
+
+ try:
+ # Get device size
+ device_size = self.get_device_size()
+ self.logger.info(f"Device size: {self._format_size(device_size)}")
+
+ # Set up timeout if specified
+ if self.timeout_minutes:
+ self._setup_timeout()
+ self.logger.info(f"Timeout set for {self.timeout_minutes} minutes")
+
+ with open(self.source, 'rb', buffering=0) as device: # buffering=0 for direct I/O
+ self._scan_device_data(device, device_size)
+
+ except (IOError, OSError) as e:
+ self.logger.error(f"Error accessing source: {e}")
+ return False
+
+ self._print_summary()
+ return True
+
+ def _scan_device_data(self, device, device_size):
+ """Scan the device data for file signatures"""
+ position = 0
+
+ # Limit scan size if specified
+ if self.max_scan_size and self.max_scan_size < device_size:
+ self.logger.info(f"Limiting scan to first {self._format_size(self.max_scan_size)} of device")
+ device_size = self.max_scan_size
+
+ # Create subdirectories for each file type
+ for file_type in self.file_types:
+ (self.output_dir / file_type).mkdir(exist_ok=True)
+
+ scan_start_time = time.time()
+ last_progress_time = scan_start_time
+
+ # Read the device in blocks
+ while position < device_size:
+ # Check if timeout reached
+ if self.timeout_reached:
+ self.logger.warning("Stopping scan due to timeout")
+ break
+
+ try:
+ # Seek to position first
+ device.seek(position)
+
+ # Read a block of data
+ data = device.read(self.block_size)
+ if not data:
+ break
+
+ self.stats['bytes_scanned'] += len(data)
+
+ # Check for file signatures in this block
+ for file_type in self.file_types:
+ signatures = FILE_SIGNATURES.get(file_type, [])
+
+ for signature in signatures:
+ sig_pos = data.find(signature)
+
+ if sig_pos != -1:
+ # Found a file signature, try to recover the file
+ absolute_pos = position + sig_pos
+ device.seek(absolute_pos)
+
+ self.logger.debug(f"Found {file_type} signature at position {absolute_pos}")
+
+ # Recover the file
+ if self._recover_file(device, file_type, absolute_pos):
+ self.stats['total_files_recovered'] += 1
+ self.stats['recovered_by_type'][file_type] += 1
+ else:
+ self.stats['false_positives'] += 1
+
+ # Reset position to continue scanning
+ device.seek(position + self.block_size)
+
+ # Update position and show progress
+ position += self.block_size
+ current_time = time.time()
+
+ # Show progress every 5MB or 10 seconds, whichever comes first
+ if (position % (5 * 1024 * 1024) == 0) or (current_time - last_progress_time >= 10):
+ percent = (position / device_size) * 100 if device_size > 0 else 0
+ elapsed = current_time - self.stats['start_time']
+
+ # Calculate estimated time remaining
+ if position > 0 and device_size > 0:
+ bytes_per_second = position / elapsed if elapsed > 0 else 0
+ remaining_bytes = device_size - position
+ eta_seconds = remaining_bytes / bytes_per_second if bytes_per_second > 0 else 0
+ eta_str = str(timedelta(seconds=int(eta_seconds)))
+ else:
+ eta_str = "unknown"
+
+ self.logger.info(f"Progress: {percent:.2f}% ({self._format_size(position)} / {self._format_size(device_size)}) - "
+ f"{self.stats['total_files_recovered']} files recovered - "
+ f"Elapsed: {timedelta(seconds=int(elapsed))} - ETA: {eta_str}")
+ last_progress_time = current_time
+
+ except Exception as e:
+ self.logger.error(f"Error reading at position {position}: {e}")
+ position += self.block_size # Skip this block and continue
+
+ def _validate_file_content(self, data, file_type):
+ """
+ Additional validation to reduce false positives
+
+ Args:
+ data: File data to validate
+ file_type: Type of file to validate
+
+ Returns:
+ bool: True if file content appears valid
+ """
+ # Check minimum size
+ if len(data) < 100:
+ return False
+
+ # Check for validation patterns
+ patterns = VALIDATION_PATTERNS.get(file_type, [])
+ if patterns:
+ for pattern in patterns:
+ if pattern in data:
+ return True
+ return False # None of the patterns were found
+
+ # For file types without specific validation patterns
+ return True
+
+ def _recover_file(self, device, file_type, start_position):
+ """
+ Recover a file of the given type starting at the given position
+
+ Args:
+ device: Open file handle to the device
+ file_type: Type of file to recover
+ start_position: Starting position of the file
+
+ Returns:
+ bool: True if file was recovered successfully
+ """
+ max_size = MAX_FILE_SIZES.get(file_type, 10 * 1024 * 1024) # Default to 10MB
+ trailer = FILE_TRAILERS.get(file_type)
+
+ # Generate a unique filename
+ filename = f"{file_type}_{start_position}_{int(time.time())}_{binascii.hexlify(os.urandom(4)).decode()}.{file_type}"
+ output_path = self.output_dir / file_type / filename
+
+ if self.skip_existing and output_path.exists():
+ self.logger.debug(f"Skipping existing file: {output_path}")
+ return False
+
+ # Save the current position to restore later
+ current_pos = device.tell()
+
+ try:
+ # Seek to the start of the file
+ device.seek(start_position)
+
+ # Read the file data
+ if trailer and self.deep_scan:
+ # If we know the trailer and deep scan is enabled, read until trailer
+ file_data = self._read_until_trailer(device, trailer, max_size)
+ else:
+ # Otherwise, use heuristics to determine file size
+ file_data = self._read_file_heuristic(device, file_type, max_size)
+
+ if not file_data or len(file_data) < 100: # Ignore very small files
+ return False
+
+ # Additional validation to reduce false positives
+ if not self._validate_file_content(file_data, file_type):
+ self.logger.debug(f"Skipping invalid {file_type} file at position {start_position}")
+ return False
+
+ # Write the recovered file
+ with open(output_path, 'wb') as f:
+ f.write(file_data)
+
+ self.logger.info(f"Recovered {file_type} file: {filename} ({self._format_size(len(file_data))})")
+ return True
+
+ except Exception as e:
+ self.logger.error(f"Error recovering file at position {start_position}: {e}")
+ return False
+ finally:
+ # Restore the original position
+ try:
+ device.seek(current_pos)
+ except:
+ pass # Ignore seek errors in finally block
+
+ def _read_until_trailer(self, device, trailer, max_size):
+ """Read data until a trailer signature is found or max size is reached"""
+ buffer = bytearray()
+ chunk_size = 4096
+
+ while len(buffer) < max_size:
+ try:
+ chunk = device.read(chunk_size)
+ if not chunk:
+ break
+
+ buffer.extend(chunk)
+
+ # Check if trailer is in the buffer
+ trailer_pos = buffer.find(trailer, max(0, len(buffer) - len(trailer) - chunk_size))
+ if trailer_pos != -1:
+ # Found trailer, return data up to and including the trailer
+ return buffer[:trailer_pos + len(trailer)]
+ except Exception as e:
+ self.logger.error(f"Error reading chunk: {e}")
+ break
+
+ # If we reached max size without finding a trailer, return what we have
+ return buffer if len(buffer) > 100 else None
+
+ def _read_file_heuristic(self, device, file_type, max_size):
+ """
+ Use heuristics to determine file size when trailer is unknown
+ This is a simplified approach - real tools use more sophisticated methods
+ """
+ buffer = bytearray()
+ chunk_size = 4096
+ valid_chunks = 0
+ invalid_chunks = 0
+
+ # For Office documents and ZIP files, read a larger initial chunk to validate
+ initial_chunk_size = 16384 if file_type in ['docx', 'xlsx', 'pptx', 'zip'] else chunk_size
+
+ # Read initial chunk for validation
+ initial_chunk = device.read(initial_chunk_size)
+ if not initial_chunk:
+ return None
+
+ buffer.extend(initial_chunk)
+
+ # For Office documents, check if it contains required elements
+ if file_type in ['docx', 'xlsx', 'pptx', 'zip']:
+ # Basic validation for Office Open XML files
+ if file_type == 'docx' and b'word/' not in initial_chunk:
+ return None
+ if file_type == 'xlsx' and b'xl/' not in initial_chunk:
+ return None
+ if file_type == 'pptx' and b'ppt/' not in initial_chunk:
+ return None
+ if file_type == 'zip' and b'PK\x01\x02' not in initial_chunk:
+ return None
+
+ # Continue reading chunks
+ while len(buffer) < max_size:
+ try:
+ chunk = device.read(chunk_size)
+ if not chunk:
+ break
+
+ buffer.extend(chunk)
+
+ # Simple heuristic: for binary files, check if chunk contains too many non-printable characters
+ # This is a very basic approach and would need to be refined for real-world use
+ if file_type in ['jpg', 'png', 'gif', 'pdf', 'zip', 'docx', 'xlsx', 'pptx', 'mp3', 'mp4', 'avi']:
+ # For binary files, we continue reading until we hit max size or end of device
+ valid_chunks += 1
+
+ # For ZIP-based formats, check for corruption
+ if file_type in ['zip', 'docx', 'xlsx', 'pptx'] and b'PK' not in chunk and valid_chunks > 10:
+ # If we've read several chunks and don't see any more PK signatures, we might be past the file
+ invalid_chunks += 1
+
+ else:
+ # For text files, we could check for text validity
+ printable_ratio = sum(32 <= b <= 126 or b in (9, 10, 13) for b in chunk) / len(chunk)
+ if printable_ratio < 0.7: # If less than 70% printable characters
+ invalid_chunks += 1
+ else:
+ valid_chunks += 1
+
+ # If we have too many invalid chunks in a row, stop
+ if invalid_chunks > 3:
+ return buffer[:len(buffer) - (invalid_chunks * chunk_size)]
+ except Exception as e:
+ self.logger.error(f"Error reading chunk in heuristic: {e}")
+ break
+
+ return buffer
+
+ def _format_size(self, size_bytes):
+ """Format size in bytes to a human-readable string"""
+ for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
+ if size_bytes < 1024 or unit == 'TB':
+ return f"{size_bytes:.2f} {unit}"
+ size_bytes /= 1024
+
+ def _print_summary(self):
+ """Print a summary of the recovery operation"""
+ elapsed = time.time() - self.stats['start_time']
+
+ self.logger.info("=" * 50)
+ self.logger.info("Recovery Summary")
+ self.logger.info("=" * 50)
+ self.logger.info(f"Total files recovered: {self.stats['total_files_recovered']}")
+ self.logger.info(f"False positives detected and skipped: {self.stats['false_positives']}")
+ self.logger.info(f"Total data scanned: {self._format_size(self.stats['bytes_scanned'])}")
+ self.logger.info(f"Time elapsed: {timedelta(seconds=int(elapsed))}")
+ self.logger.info("Files recovered by type:")
+
+ for file_type, count in self.stats['recovered_by_type'].items():
+ if count > 0:
+ self.logger.info(f" - {file_type}: {count}")
+
+ if self.timeout_reached:
+ self.logger.info("Note: Scan was stopped due to timeout")
+
+ self.logger.info("=" * 50)
+
+
+def main():
+ """Main function to parse arguments and run the recovery tool"""
+ parser = argparse.ArgumentParser(description='File Recovery Tool - Recover deleted files from storage devices')
+
+ parser.add_argument('source', help='Source device or directory to recover files from (e.g., /dev/sdb, /media/usb)')
+ parser.add_argument('output', help='Directory to save recovered files')
+
+ parser.add_argument('-t', '--types', nargs='+', choices=FILE_SIGNATURES.keys(), default=None,
+ help='File types to recover (default: all supported types)')
+
+ parser.add_argument('-d', '--deep-scan', action='https://github.com/store_true',
+ help='Perform a deep scan (slower but more thorough)')
+
+ parser.add_argument('-b', '--block-size', type=int, default=512,
+ help='Block size for reading data (default: 512 bytes)')
+
+ parser.add_argument('-v', '--verbose', action='https://github.com/store_true',
+ help='Enable verbose output')
+
+ parser.add_argument('-q', '--quiet', action='https://github.com/store_true',
+ help='Suppress all output except errors')
+
+ parser.add_argument('--no-skip', action='https://github.com/store_true',
+ help='Do not skip existing files in output directory')
+
+ parser.add_argument('--max-size', type=int,
+ help='Maximum size to scan in MB (e.g., 1024 for 1GB)')
+
+ parser.add_argument('--timeout', type=int, default=None,
+ help='Stop scanning after specified minutes')
+
+ args = parser.parse_args()
+
+ # Set logging level based on verbosity
+ if args.quiet:
+ log_level = logging.ERROR
+ elif args.verbose:
+ log_level = logging.DEBUG
+ else:
+ log_level = logging.INFO
+
+ # Convert max size from MB to bytes if specified
+ max_scan_size = args.max_size * 1024 * 1024 if args.max_size else None
+
+ # Create and run the recovery tool
+ recovery_tool = FileRecoveryTool(
+ source=args.source,
+ output_dir=args.output,
+ file_types=args.types,
+ deep_scan=args.deep_scan,
+ block_size=args.block_size,
+ log_level=log_level,
+ skip_existing=not args.no_skip,
+ max_scan_size=max_scan_size,
+ timeout_minutes=args.timeout
+ )
+
+ try:
+ recovery_tool.scan_device()
+ except KeyboardInterrupt:
+ print("\nRecovery process interrupted by user.")
+ recovery_tool._print_summary()
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/web-scraping/youtube-extractor/extract_video_info.py b/web-scraping/youtube-extractor/extract_video_info.py
index 042ce4f8..bed184b0 100644
--- a/web-scraping/youtube-extractor/extract_video_info.py
+++ b/web-scraping/youtube-extractor/extract_video_info.py
@@ -1,92 +1,150 @@
-from requests_html import HTMLSession
-from bs4 import BeautifulSoup as bs
+import requests
+from bs4 import BeautifulSoup
import re
import json
-
-# init session
-session = HTMLSession()
-
+import argparse
def get_video_info(url):
- # download HTML code
- response = session.get(url)
- # execute Javascript
- response.html.render(timeout=60)
- # create beautiful soup object to parse HTML
- soup = bs(response.html.html, "html.parser")
- # open("index.html", "w").write(response.html.html)
- # initialize the result
- result = {}
- # video title
- result["title"] = soup.find("meta", itemprop="name")['content']
- # video views
- result["views"] = soup.find("meta", itemprop="interactionCount")['content']
- # video description
- result["description"] = soup.find("meta", itemprop="description")['content']
- # date published
- result["date_published"] = soup.find("meta", itemprop="datePublished")['content']
- # get the duration of the video
- result["duration"] = soup.find("span", {"class": "ytp-time-duration"}).text
- # get the video tags
- result["tags"] = ', '.join([ meta.attrs.get("content") for meta in soup.find_all("meta", {"property": "og:video:tag"}) ])
-
- # Additional video and channel information (with help from: https://stackoverflow.com/a/68262735)
- data = re.search(r"var ytInitialData = ({.*?});", soup.prettify()).group(1)
- data_json = json.loads(data)
- videoPrimaryInfoRenderer = data_json['contents']['twoColumnWatchNextResults']['results']['results']['contents'][0]['videoPrimaryInfoRenderer']
- videoSecondaryInfoRenderer = data_json['contents']['twoColumnWatchNextResults']['results']['results']['contents'][1]['videoSecondaryInfoRenderer']
- # number of likes
- likes_label = videoPrimaryInfoRenderer['videoActions']['menuRenderer']['topLevelButtons'][0]['toggleButtonRenderer']['defaultText']['accessibility']['accessibilityData']['label'] # "No likes" or "###,### likes"
- likes_str = likes_label.split(' ')[0].replace(',','')
- result["likes"] = '0' if likes_str == 'No' else likes_str
- # number of likes (old way) doesn't always work
- # text_yt_formatted_strings = soup.find_all("yt-formatted-string", {"id": "text", "class": "ytd-toggle-button-renderer"})
- # result["likes"] = ''.join([ c for c in text_yt_formatted_strings[0].attrs.get("aria-label") if c.isdigit() ])
- # result["likes"] = 0 if result['likes'] == '' else int(result['likes'])
- # number of dislikes - YouTube does not publish this anymore...
- # result["dislikes"] = ''.join([ c for c in text_yt_formatted_strings[1].attrs.get("aria-label") if c.isdigit() ])
- # result["dislikes"] = '0' if result['dislikes'] == '' else result['dislikes']
- result['dislikes'] = 'UNKNOWN'
- # channel details
- channel_tag = soup.find("meta", itemprop="channelId")['content']
- # channel name
- channel_name = soup.find("span", itemprop="author").next.next['content']
- # channel URL
- # channel_url = soup.find("span", itemprop="author").next['href']
- channel_url = f"https://www.youtube.com/%7Bchannel_tag%7D"
- # number of subscribers as str
- channel_subscribers = videoSecondaryInfoRenderer['owner']['videoOwnerRenderer']['subscriberCountText']['accessibility']['accessibilityData']['label']
- # channel details (old way)
- # channel_tag = soup.find("yt-formatted-string", {"class": "ytd-channel-name"}).find("a")
- # # channel name (old way)
- # channel_name = channel_tag.text
- # # channel URL (old way)
- # channel_url = f"https://www.youtube.com{channel_tag['href']}"
- # number of subscribers as str (old way)
- # channel_subscribers = soup.find("yt-formatted-string", {"id": "owner-sub-count"}).text.strip()
- result['channel'] = {'name': channel_name, 'url': channel_url, 'subscribers': channel_subscribers}
- return result
+ """
+ Extract video information from YouTube using modern approach
+ """
+ headers = {
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
+ }
+
+ try:
+ # Download HTML code
+ response = requests.get(url, headers=headers)
+ response.raise_for_status()
+
+ # Create beautiful soup object to parse HTML
+ soup = BeautifulSoup(response.text, "html.parser")
+
+ # Initialize the result
+ result = {}
+
+ # Extract ytInitialData which contains all the video information
+ data_match = re.search(r'var ytInitialData = ({.*?});', response.text)
+ if not data_match:
+ raise Exception("Could not find ytInitialData in page")
+
+ data_json = json.loads(data_match.group(1))
+
+ # Get the main content sections
+ contents = data_json['contents']['twoColumnWatchNextResults']['results']['results']['contents']
+
+ # Extract video information from videoPrimaryInfoRenderer
+ if 'videoPrimaryInfoRenderer' in contents[0]:
+ primary = contents[0]['videoPrimaryInfoRenderer']
+
+ # Video title
+ result["title"] = primary['title']['runs'][0]['text']
+
+ # Video views
+ result["views"] = primary['viewCount']['videoViewCountRenderer']['viewCount']['simpleText']
+
+ # Date published
+ result["date_published"] = primary['dateText']['simpleText']
+
+ # Extract channel information from videoSecondaryInfoRenderer
+ secondary = None
+ if 'videoSecondaryInfoRenderer' in contents[1]:
+ secondary = contents[1]['videoSecondaryInfoRenderer']
+ owner = secondary['owner']['videoOwnerRenderer']
+
+ # Channel name
+ channel_name = owner['title']['runs'][0]['text']
+
+ # Channel ID
+ channel_id = owner['navigationEndpoint']['browseEndpoint']['browseId']
+
+ # Channel URL - FIXED with proper /channel/ path
+ channel_url = f"https://www.youtube.com/channel/%7Bchannel_id%7D"
+
+ # Number of subscribers
+ channel_subscribers = owner['subscriberCountText']['accessibility']['accessibilityData']['label']
+
+ result['channel'] = {
+ 'name': channel_name,
+ 'url': channel_url,
+ 'subscribers': channel_subscribers
+ }
+
+ # Extract video description
+ if secondary and 'attributedDescription' in secondary:
+ description_runs = secondary['attributedDescription']['content']
+ result["description"] = description_runs
+ else:
+ result["description"] = "Description not available"
+
+ # Try to extract video duration from player overlay
+ # This is a fallback approach since the original method doesn't work
+ duration_match = re.search(r'"approxDurationMs":"(\d+)"', response.text)
+ if duration_match:
+ duration_ms = int(duration_match.group(1))
+ minutes = duration_ms // 60000
+ seconds = (duration_ms % 60000) // 1000
+ result["duration"] = f"{minutes}:{seconds:02d}"
+ else:
+ result["duration"] = "Duration not available"
+
+ # Extract video tags if available
+ video_tags = []
+ if 'keywords' in data_json.get('metadata', {}).get('videoMetadataRenderer', {}):
+ video_tags = data_json['metadata']['videoMetadataRenderer']['keywords']
+ result["tags"] = ', '.join(video_tags) if video_tags else "No tags available"
+
+ # Extract likes (modern approach)
+ result["likes"] = "Likes count not available"
+ result["dislikes"] = "UNKNOWN" # YouTube no longer shows dislikes
+
+ # Try to find likes in the new structure
+ for content in contents:
+ if 'compositeVideoPrimaryInfoRenderer' in content:
+ composite = content['compositeVideoPrimaryInfoRenderer']
+ if 'likeButton' in composite:
+ like_button = composite['likeButton']
+ if 'toggleButtonRenderer' in like_button:
+ toggle = like_button['toggleButtonRenderer']
+ if 'defaultText' in toggle:
+ default_text = toggle['defaultText']
+ if 'accessibility' in default_text:
+ accessibility = default_text['accessibility']
+ if 'accessibilityData' in accessibility:
+ label = accessibility['accessibilityData']['label']
+ if 'like' in label.lower():
+ result["likes"] = label
+
+ return result
+
+ except Exception as e:
+ raise Exception(f"Error extracting video info: {str(e)}")
if __name__ == "__main__":
- import argparse
parser = argparse.ArgumentParser(description="YouTube Video Data Extractor")
parser.add_argument("url", help="URL of the YouTube video")
args = parser.parse_args()
+
# parse the video URL from command line
url = args.url
- data = get_video_info(url)
+ try:
+ data = get_video_info(url)
- # print in nice format
- print(f"Title: {data['title']}")
- print(f"Views: {data['views']}")
- print(f"Published at: {data['date_published']}")
- print(f"Video Duration: {data['duration']}")
- print(f"Video tags: {data['tags']}")
- print(f"Likes: {data['likes']}")
- print(f"Dislikes: {data['dislikes']}")
- print(f"\nDescription: {data['description']}\n")
- print(f"\nChannel Name: {data['channel']['name']}")
- print(f"Channel URL: {data['channel']['url']}")
- print(f"Channel Subscribers: {data['channel']['subscribers']}")
+ # print in nice format
+ print(f"Title: {data['title']}")
+ print(f"Views: {data['views']}")
+ print(f"Published at: {data['date_published']}")
+ print(f"Video Duration: {data['duration']}")
+ print(f"Video tags: {data['tags']}")
+ print(f"Likes: {data['likes']}")
+ print(f"Dislikes: {data['dislikes']}")
+ print(f"\nDescription: {data['description']}\n")
+ print(f"\nChannel Name: {data['channel']['name']}")
+ print(f"Channel URL: {data['channel']['url']}")
+ print(f"Channel Subscribers: {data['channel']['subscribers']}")
+
+ except Exception as e:
+ print(f"Error: {e}")
+ print("\nNote: YouTube frequently changes its structure, so this script may need updates.")
\ No newline at end of file
diff --git a/web-scraping/youtube-transcript-summarizer/README.md b/web-scraping/youtube-transcript-summarizer/README.md
new file mode 100644
index 00000000..a3df25a0
--- /dev/null
+++ b/web-scraping/youtube-transcript-summarizer/README.md
@@ -0,0 +1 @@
+# [YouTube Video Transcription Summarization with Python](https://thepythoncode.com/article/youtube-video-transcription-and-summarization-with-python)
\ No newline at end of file
diff --git a/web-scraping/youtube-transcript-summarizer/requirements.txt b/web-scraping/youtube-transcript-summarizer/requirements.txt
new file mode 100644
index 00000000..865ee3b5
--- /dev/null
+++ b/web-scraping/youtube-transcript-summarizer/requirements.txt
@@ -0,0 +1,5 @@
+nltk
+pytube
+youtube_transcript_api
+colorama
+openai
diff --git a/web-scraping/youtube-transcript-summarizer/youtube_transcript_summarizer.py b/web-scraping/youtube-transcript-summarizer/youtube_transcript_summarizer.py
new file mode 100644
index 00000000..bdb80f54
--- /dev/null
+++ b/web-scraping/youtube-transcript-summarizer/youtube_transcript_summarizer.py
@@ -0,0 +1,319 @@
+import os
+import sys
+import nltk
+import pytube
+from youtube_transcript_api import YouTubeTranscriptApi
+from nltk.corpus import stopwords
+from nltk.tokenize import sent_tokenize, word_tokenize
+from nltk.probability import FreqDist
+from heapq import nlargest
+from urllib.parse import urlparse, parse_qs
+import textwrap
+from colorama import Fore, Back, Style, init
+from openai import OpenAI
+
+# Initialize colorama for cross-platform colored terminal output
+init(autoreset=True)
+
+# Download necessary NLTK data
+nltk.download('punkt_tab', quiet=True)
+nltk.download('punkt', quiet=True)
+nltk.download('stopwords', quiet=True)
+
+# Initialize OpenAI client from environment variable
+# Expect the OpenRouter API key to be provided via OPENROUTER_API_KEY
+api_key = os.getenv("OPENROUTER_API_KEY")
+if not api_key:
+ print(Fore.RED + "Error: OPENROUTER_API_KEY environment variable is not set or is still the placeholder ('').")
+ sys.exit(1)
+else:
+ client = OpenAI(
+ base_url="https://openrouter.ai/api/v1",
+ api_key=api_key,
+ )
+
+def extract_video_id(youtube_url):
+ """Extract the video ID from a YouTube URL."""
+ parsed_url = urlparse(youtube_url)
+
+ if parsed_url.netloc == 'youtu.be':
+ return parsed_url.path[1:]
+
+ if parsed_url.netloc in ('www.youtube.com', 'youtube.com'):
+ if parsed_url.path == '/watch':
+ return parse_qs(parsed_url.query)['v'][0]
+ elif parsed_url.path.startswith('/embed/'):
+ return parsed_url.path.split('/')[2]
+ elif parsed_url.path.startswith('/v/'):
+ return parsed_url.path.split('/')[2]
+
+ # If no match found
+ raise ValueError(f"Could not extract video ID from URL: {youtube_url}")
+
+def get_transcript(video_id):
+ """Get the transcript of a YouTube video."""
+ try:
+ youtube_transcript_api = YouTubeTranscriptApi()
+ fetched_transcript = youtube_transcript_api.fetch(video_id)
+ full_transcript = " ".join([snippet.text for snippet in fetched_transcript.snippets])
+ return full_transcript.strip()
+ except Exception as e:
+ return f"Error retrieving transcript: {str(e)}."
+
+def summarize_text_nltk(text, num_sentences=5):
+ """Summarize text using frequency-based extractive summarization with NLTK."""
+ if not text or text.startswith("Error") or text.startswith("Transcript not available"):
+ return text
+
+ # Tokenize the text into sentences and words
+ sentences = sent_tokenize(text)
+
+ # If there are fewer sentences than requested, return all sentences
+ if len(sentences) <= num_sentences:
+ return text
+
+ # Tokenize words and remove stopwords
+ stop_words = set(stopwords.words('english'))
+ words = word_tokenize(text.lower())
+ words = [word for word in words if word.isalnum() and word not in stop_words]
+
+ # Calculate word frequencies
+ freq = FreqDist(words)
+
+ # Score sentences based on word frequencies
+ sentence_scores = {}
+ for i, sentence in enumerate(sentences):
+ for word in word_tokenize(sentence.lower()):
+ if word in freq:
+ if i in sentence_scores:
+ sentence_scores[i] += freq[word]
+ else:
+ sentence_scores[i] = freq[word]
+
+ # Get the top N sentences with highest scores
+ summary_sentences_indices = nlargest(num_sentences, sentence_scores, key=sentence_scores.get)
+ summary_sentences_indices.sort() # Sort to maintain original order
+
+ # Construct the summary
+ summary = ' '.join([sentences[i] for i in summary_sentences_indices])
+ return summary
+
+def summarize_text_ai(text, video_title, num_sentences=5):
+ """Summarize text using the Mistral AI model via OpenRouter."""
+ if not text or text.startswith("Error") or text.startswith("Transcript not available"):
+ return text
+
+ # Truncate text if it's too long (models often have token limits)
+ max_chars = 15000 # Adjust based on model's context window
+ truncated_text = text[:max_chars] if len(text) > max_chars else text
+
+ prompt = f"""Please provide a concise summary of the following YouTube video transcript.
+Title: {video_title}
+
+Transcript:
+{truncated_text}
+
+Create a clear, informative summary that captures the main points and key insights from the video.
+Your summary should be approximately {num_sentences} sentences long.
+"""
+
+ try:
+ completion = client.chat.completions.create(
+ model="mistralai/mistral-small-3.1-24b-instruct:free",
+ messages=[
+ {
+ "role": "user",
+ "content": [
+ {
+ "type": "text",
+ "text": prompt
+ }
+ ]
+ }
+ ]
+ )
+ return completion.choices[0].message.content
+ except Exception as e:
+ return f"Error generating AI summary: {str(e)}"
+
+def summarize_youtube_video(youtube_url, num_sentences=5):
+ """Main function to summarize a YouTube video's transcription."""
+ try:
+ video_id = extract_video_id(youtube_url)
+ transcript = get_transcript(video_id)
+
+ # Get video title for context
+ try:
+ yt = pytube.YouTube(youtube_url)
+ video_title = yt.title
+
+ except Exception as e:
+ video_title = "Unknown Title"
+
+
+ # Generate both summaries
+ print(Fore.YELLOW + f"Generating AI summary with {num_sentences} sentences...")
+ ai_summary = summarize_text_ai(transcript, video_title, num_sentences)
+
+ print(Fore.YELLOW + f"Generating NLTK summary with {num_sentences} sentences...")
+ nltk_summary = summarize_text_nltk(transcript, num_sentences)
+
+ return {
+ "video_title": video_title,
+ "video_id": video_id,
+ "ai_summary": ai_summary,
+ "nltk_summary": nltk_summary,
+ "full_transcript_length": len(transcript.split()),
+ "nltk_summary_length": len(nltk_summary.split()),
+ "ai_summary_length": len(ai_summary.split()) if not ai_summary.startswith("Error") else 0
+ }
+ except Exception as e:
+ return {"error": str(e)}
+
+def format_time(seconds):
+ """Convert seconds to a readable time format."""
+ hours, remainder = divmod(seconds, 3600)
+ minutes, seconds = divmod(remainder, 60)
+
+ if hours > 0:
+ return f"{hours}h {minutes}m {seconds}s"
+ elif minutes > 0:
+ return f"{minutes}m {seconds}s"
+ else:
+ return f"{seconds}s"
+
+def format_number(number):
+ """Format large numbers with commas for readability."""
+ return "{:,}".format(number)
+
+def print_boxed_text(text, width=80, title=None, color=Fore.WHITE):
+ """Print text in a nice box with optional title."""
+ wrapper = textwrap.TextWrapper(width=width-4) # -4 for the box margins
+ wrapped_text = wrapper.fill(text)
+ lines = wrapped_text.split('\n')
+
+ # Print top border with optional title
+ if title:
+ title_space = width - 4 - len(title)
+ left_padding = title_space // 2
+ right_padding = title_space - left_padding
+ print(color + '┌' + '─' * left_padding + title + '─' * right_padding + '┐')
+ else:
+ print(color + '┌' + '─' * (width-2) + '┐')
+
+ # Print content
+ for line in lines:
+ padding = width - 2 - len(line)
+ print(color + '│ ' + line + ' ' * padding + '│')
+
+ # Print bottom border
+ print(color + '└' + '─' * (width-2) + '┘')
+
+def print_summary_result(result, width=80):
+ """Print the summary result in a nicely formatted way."""
+ if "error" in result:
+ print_boxed_text(f"Error: {result['error']}", width=width, title="ERROR", color=Fore.RED)
+ return
+
+ # Terminal width
+ terminal_width = width
+
+ # Print header with video information
+ print("\n" + Fore.CYAN + "=" * terminal_width)
+ print(Fore.CYAN + Style.BRIGHT + result['video_title'].center(terminal_width))
+ print(Fore.CYAN + "=" * terminal_width + "\n")
+
+ # Video metadata section
+ print(Fore.YELLOW + Style.BRIGHT + "VIDEO INFORMATION".center(terminal_width))
+ print(Fore.YELLOW + "─" * terminal_width)
+
+ # Two-column layout for metadata
+ col_width = terminal_width // 2 - 2
+
+ # Row 3
+ print(f"{Fore.GREEN}Video ID: {Fore.WHITE}{result['video_id']:<{col_width}}"
+ f"{Fore.GREEN}URL: {Fore.WHITE}https://youtu.be/{result['video_id']}")
+
+ print(Fore.YELLOW + "─" * terminal_width + "\n")
+
+ # AI Summary section
+ ai_compression = "N/A"
+ if result['ai_summary_length'] > 0:
+ ai_compression = round((1 - result['ai_summary_length'] / result['full_transcript_length']) * 100)
+
+ ai_summary_title = f" AI SUMMARY ({result['ai_summary_length']} words, condensed {ai_compression}% from {result['full_transcript_length']} words) "
+
+ print(Fore.GREEN + Style.BRIGHT + ai_summary_title.center(terminal_width))
+ print(Fore.GREEN + "─" * terminal_width)
+
+ # Print the AI summary with proper wrapping
+ wrapper = textwrap.TextWrapper(width=terminal_width-4,
+ initial_indent=' ',
+ subsequent_indent=' ')
+
+ # Split AI summary into paragraphs and print each
+ ai_paragraphs = result['ai_summary'].split('\n')
+ for paragraph in ai_paragraphs:
+ if paragraph.strip(): # Skip empty paragraphs
+ print(wrapper.fill(paragraph))
+ print() # Empty line between paragraphs
+
+ print(Fore.GREEN + "─" * terminal_width + "\n")
+
+ # NLTK Summary section
+ nltk_compression = round((1 - result['nltk_summary_length'] / result['full_transcript_length']) * 100)
+ nltk_summary_title = f" NLTK SUMMARY ({result['nltk_summary_length']} words, condensed {nltk_compression}% from {result['full_transcript_length']} words) "
+
+ print(Fore.MAGENTA + Style.BRIGHT + nltk_summary_title.center(terminal_width))
+ print(Fore.MAGENTA + "─" * terminal_width)
+
+ # Split NLTK summary into paragraphs and wrap each
+ paragraphs = result['nltk_summary'].split('. ')
+ formatted_paragraphs = []
+
+ current_paragraph = ""
+ for sentence in paragraphs:
+ if not sentence.endswith('.'):
+ sentence += '.'
+
+ if len(current_paragraph) + len(sentence) + 1 <= 150: # Arbitrary length for paragraph
+ current_paragraph += " " + sentence if current_paragraph else sentence
+ else:
+ if current_paragraph:
+ formatted_paragraphs.append(current_paragraph)
+ current_paragraph = sentence
+
+ if current_paragraph:
+ formatted_paragraphs.append(current_paragraph)
+
+ # Print each paragraph
+ for paragraph in formatted_paragraphs:
+ print(wrapper.fill(paragraph))
+ print() # Empty line between paragraphs
+
+ print(Fore.MAGENTA + "─" * terminal_width + "\n")
+
+
+if __name__ == "__main__":
+ # Get terminal width
+ try:
+ terminal_width = os.get_terminal_size().columns
+ # Limit width to reasonable range
+ terminal_width = max(80, min(terminal_width, 120))
+ except:
+ terminal_width = 80 # Default if can't determine
+
+ # Print welcome banner
+ print(Fore.CYAN + Style.BRIGHT + "\n" + "=" * terminal_width)
+ print(Fore.CYAN + Style.BRIGHT + "YOUTUBE VIDEO SUMMARIZER".center(terminal_width))
+ print(Fore.CYAN + Style.BRIGHT + "=" * terminal_width + "\n")
+
+ youtube_url = input(Fore.GREEN + "Enter YouTube video URL: " + Fore.WHITE)
+
+ num_sentences_input = input(Fore.GREEN + "Enter number of sentences for summaries (default 5): " + Fore.WHITE)
+ num_sentences = int(num_sentences_input) if num_sentences_input.strip() else 5
+
+ print(Fore.YELLOW + "\nFetching and analyzing video transcript... Please wait...\n")
+
+ result = summarize_youtube_video(youtube_url, num_sentences)
+ print_summary_result(result, width=terminal_width)