diff --git a/README.md b/README.md index 3361147d..ddf5f25d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,11 @@ -IPRoyal Banner +

+ + CodingFleet Code Generator + + CodingFleet Code Converter + +

+ # Python Code Tutorials @@ -180,6 +187,7 @@ This is a repository of all the tutorials of [The Python Code](https://www.thepy - [How to Query the Ethereum Blockchain with Python](https://www.thepythoncode.com/article/query-ethereum-blockchain-with-python). ([code](general/query-ethereum)) - [Data Cleaning with Pandas in Python](https://www.thepythoncode.com/article/data-cleaning-using-pandas-in-python). ([code](general/data-cleaning-pandas)) - [How to Minify CSS with Python](https://www.thepythoncode.com/article/minimize-css-files-in-python). ([code](general/minify-css)) + - [Build a real MCP client and server in Python with FastMCP (Todo Manager example)](https://www.thepythoncode.com/article/fastmcp-mcp-client-server-todo-manager). ([code](general/fastmcp-mcp-client-server-todo-manager)) diff --git a/ethical-hacking/get-wifi-passwords/README.md b/ethical-hacking/get-wifi-passwords/README.md index e24eda7f..a10efc10 100644 --- a/ethical-hacking/get-wifi-passwords/README.md +++ b/ethical-hacking/get-wifi-passwords/README.md @@ -1 +1,3 @@ -# [How to Extract Saved WiFi Passwords in Python](https://www.thepythoncode.com/article/extract-saved-wifi-passwords-in-python) \ No newline at end of file +# [How to Extract Saved WiFi Passwords in Python](https://www.thepythoncode.com/article/extract-saved-wifi-passwords-in-python) + +This program lists saved Wi-Fi networks and their passwords on Windows and Linux machines. In addition to the SSID (Wi-Fi network name) and passwords, the output also shows the network’s security type and ciphers. \ No newline at end of file diff --git a/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py b/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py index 0afd70ca..ff32f6f8 100644 --- a/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py +++ b/ethical-hacking/get-wifi-passwords/get_wifi_passwords.py @@ -28,10 +28,16 @@ def get_windows_saved_wifi_passwords(verbose=1): [list]: list of extracted profiles, a profile has the fields ["ssid", "ciphers", "key"] """ ssids = get_windows_saved_ssids() - Profile = namedtuple("Profile", ["ssid", "ciphers", "key"]) + Profile = namedtuple("Profile", ["ssid", "security", "ciphers", "key"]) profiles = [] for ssid in ssids: ssid_details = subprocess.check_output(f"""netsh wlan show profile "{ssid}" key=clear""").decode() + + #get the security type + security = re.findall(r"Authentication\s(.*)", ssid_details) + # clear spaces and colon + security = "/".join(dict.fromkeys(c.strip().strip(":").strip() for c in security)) + # get the ciphers ciphers = re.findall(r"Cipher\s(.*)", ssid_details) # clear spaces and colon @@ -43,7 +49,7 @@ def get_windows_saved_wifi_passwords(verbose=1): key = key[0].strip().strip(":").strip() except IndexError: key = "None" - profile = Profile(ssid=ssid, ciphers=ciphers, key=key) + profile = Profile(ssid=ssid, security=security, ciphers=ciphers, key=key) if verbose >= 1: print_windows_profile(profile) profiles.append(profile) @@ -52,12 +58,13 @@ def get_windows_saved_wifi_passwords(verbose=1): def print_windows_profile(profile): """Prints a single profile on Windows""" - print(f"{profile.ssid:25}{profile.ciphers:15}{profile.key:50}") + #print(f"{profile.ssid:25}{profile.ciphers:15}{profile.key:50}") + print(f"{profile.ssid:25}{profile.security:30}{profile.ciphers:35}{profile.key:50}") def print_windows_profiles(verbose): """Prints all extracted SSIDs along with Key on Windows""" - print("SSID CIPHER(S) KEY") + print("SSID Securities CIPHER(S) KEY") print("-"*50) get_windows_saved_wifi_passwords(verbose) diff --git a/ethical-hacking/http-security-headers/README.md b/ethical-hacking/http-security-headers/README.md new file mode 100644 index 00000000..e0e7b1d0 --- /dev/null +++ b/ethical-hacking/http-security-headers/README.md @@ -0,0 +1,2 @@ +Grab your API key from Open Router:- https://openrouter.ai/ +Model is Used is DeepSeek: DeepSeek V3.1 (free). However, feel free to try others. \ No newline at end of file diff --git a/ethical-hacking/http-security-headers/http_security_headers.py b/ethical-hacking/http-security-headers/http_security_headers.py new file mode 100644 index 00000000..67b494c4 --- /dev/null +++ b/ethical-hacking/http-security-headers/http_security_headers.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +import requests +import json +import os +import argparse +from typing import Dict, List, Tuple +from openai import OpenAI + +class SecurityHeadersAnalyzer: + def __init__(self, api_key: str = None, base_url: str = None, model: str = None): + self.api_key = api_key or os.getenv('OPENROUTER_API_KEY') or os.getenv('OPENAI_API_KEY') + self.base_url = base_url or os.getenv('OPENROUTER_BASE_URL', 'https://openrouter.ai/api/v1') + self.model = model or os.getenv('LLM_MODEL', 'deepseek/deepseek-chat-v3.1:free') + + if not self.api_key: + raise ValueError("API key is required. Set OPENROUTER_API_KEY or provide --api-key") + + self.client = OpenAI(base_url=self.base_url, api_key=self.api_key) + + def fetch_headers(self, url: str, timeout: int = 10) -> Tuple[Dict[str, str], int]: + """Fetch HTTP headers from URL""" + if not url.startswith(('http://', 'https://')): + url = 'https://' + url + + try: + response = requests.get(url, timeout=timeout, allow_redirects=True) + return dict(response.headers), response.status_code + except requests.exceptions.RequestException as e: + print(f"Error fetching {url}: {e}") + return {}, 0 + + def analyze_headers(self, url: str, headers: Dict[str, str], status_code: int) -> str: + """Analyze headers using LLM""" + prompt = f"""Analyze the HTTP security headers for {url} (Status: {status_code}) + +Headers: +{json.dumps(headers, indent=2)} + +Provide a comprehensive security analysis including: +1. Security score (0-100) and overall assessment +2. Critical security issues that need immediate attention +3. Missing important security headers +4. Analysis of existing security headers and their effectiveness +5. Specific recommendations for improvement +6. Potential security risks based on current configuration + +Focus on practical, actionable advice following current web security best practices. Please do not include ** and # +in the response except for specific references where necessary. use numbers, romans, alphabets instead Format the response well please. """ + + try: + completion = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + temperature=0.2 + ) + return completion.choices[0].message.content + except Exception as e: + return f"Analysis failed: {e}" + + def analyze_url(https://github.com/self,%20url:%20str,%20timeout:%20int%20=%2010) -> Dict: + """Analyze a single URL""" + print(f"\nAnalyzing: {url}") + print("-" * 50) + + headers, status_code = self.fetch_headers(url, timeout) + if not headers: + return {"url": url, "error": "Failed to fetch headers"} + + print(f"Status Code: {status_code}") + print(f"\nHTTP Headers ({len(headers)} found):") + print("-" * 30) + for key, value in headers.items(): + print(f"{key}: {value}") + + print(f"\nAnalyzing with AI...") + analysis = self.analyze_headers(url, headers, status_code) + + print("\nSECURITY ANALYSIS") + print("=" * 50) + print(analysis) + + return { + "url": url, + "status_code": status_code, + "headers_count": len(headers), + "analysis": analysis, + "raw_headers": headers + } + + def analyze_multiple_urls(self, urls: List[str], timeout: int = 10) -> List[Dict]: + """Analyze multiple URLs""" + results = [] + for i, url in enumerate(urls, 1): + print(f"\n[{i}/{len(urls)}]") + result = self.analyze_url(https://github.com/url,%20timeout) + results.append(result) + return results + + def export_results(self, results: List[Dict], filename: str): + """Export results to JSON""" + with open(filename, 'w') as f: + json.dump(results, f, indent=2, ensure_ascii=False) + print(f"\nResults exported to: {filename}") + +def main(): + parser = argparse.ArgumentParser( + description='Analyze HTTP security headers using AI', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog='''Examples: + python security_headers.py https://example.com + python security_headers.py example.com google.com + python security_headers.py example.com --export results.json + +Environment Variables: + OPENROUTER_API_KEY - API key for OpenRouter + OPENAI_API_KEY - API key for OpenAI + LLM_MODEL - Model to use (default: deepseek/deepseek-chat-v3.1:free)''' + ) + + parser.add_argument('urls', nargs='+', help='URLs to analyze') + parser.add_argument('--api-key', help='API key for LLM service') + parser.add_argument('--base-url', help='Base URL for LLM API') + parser.add_argument('--model', help='LLM model to use') + parser.add_argument('--timeout', type=int, default=10, help='Request timeout (default: 10s)') + parser.add_argument('--export', help='Export results to JSON file') + + args = parser.parse_args() + + try: + analyzer = SecurityHeadersAnalyzer( + api_key=args.api_key, + base_url=args.base_url, + model=args.model + ) + + results = analyzer.analyze_multiple_urls(args.urls, args.timeout) + + if args.export: + analyzer.export_results(results, args.export) + + except ValueError as e: + print(f"Error: {e}") + return 1 + except KeyboardInterrupt: + print("\nAnalysis interrupted by user") + return 1 + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/ethical-hacking/http-security-headers/requirements.txt b/ethical-hacking/http-security-headers/requirements.txt new file mode 100644 index 00000000..f0dd0aec --- /dev/null +++ b/ethical-hacking/http-security-headers/requirements.txt @@ -0,0 +1 @@ +openai \ No newline at end of file diff --git a/general/fastmcp-mcp-client-server-todo-manager/README.md b/general/fastmcp-mcp-client-server-todo-manager/README.md new file mode 100644 index 00000000..dd988428 --- /dev/null +++ b/general/fastmcp-mcp-client-server-todo-manager/README.md @@ -0,0 +1,39 @@ +# Build a real MCP client and server in Python with FastMCP (Todo Manager example) + +This folder contains the code that accompanies the article: + +- Article: https://www.thepythoncode.com/article/fastmcp-mcp-client-server-todo-manager + +What’s included +- `todo_server.py`: FastMCP MCP server exposing tools, resources, and a prompt for a Todo Manager. +- `todo_client_test.py`: A small client script that connects to the server and exercises all features. +- `requirements.txt`: Python dependencies for this tutorial. + +Quick start +1) Install requirements +```bash +python -m venv .venv && source .venv/bin/activate # or use your preferred env manager +pip install -r requirements.txt +``` + +2) Run the server (stdio transport by default) +```bash +python todo_server.py +``` + +3) In a separate terminal, run the client +```bash +python todo_client_test.py +``` + +Optional: run the server over HTTP +- In `todo_server.py`, replace the last line with: +```python +mcp.run(transport="http", host="127.0.0.1", port=8000) +``` +- Then change the client constructor to `Client("http://127.0.0.1:8000/mcp")`. + +Notes +- Requires Python 3.10+. +- The example uses in-memory storage for simplicity. +- For production tips (HTTPS, auth, containerization), see the article. diff --git a/general/fastmcp-mcp-client-server-todo-manager/requirements.txt b/general/fastmcp-mcp-client-server-todo-manager/requirements.txt new file mode 100644 index 00000000..2c9387f7 --- /dev/null +++ b/general/fastmcp-mcp-client-server-todo-manager/requirements.txt @@ -0,0 +1 @@ +fastmcp>=2.12 \ No newline at end of file diff --git a/general/fastmcp-mcp-client-server-todo-manager/todo_client_test.py b/general/fastmcp-mcp-client-server-todo-manager/todo_client_test.py new file mode 100644 index 00000000..f01a1e78 --- /dev/null +++ b/general/fastmcp-mcp-client-server-todo-manager/todo_client_test.py @@ -0,0 +1,50 @@ +import asyncio +from fastmcp import Client + +async def main(): + # Option A: Connect to local Python script (stdio) + client = Client("todo_server.py") + + # Option B: In-memory (for tests) + # from todo_server import mcp + # client = Client(mcp) + + async with client: + await client.ping() + print("[OK] Connected") + + # Create a few todos + t1 = await client.call_tool("create_todo", {"title": "Write README", "priority": "high"}) + t2 = await client.call_tool("create_todo", {"title": "Refactor utils", "description": "Split helpers into modules"}) + t3 = await client.call_tool("create_todo", {"title": "Add tests", "priority": "low"}) + print("Created IDs:", t1.data["id"], t2.data["id"], t3.data["id"]) + + # List open + open_list = await client.call_tool("list_todos", {"status": "open"}) + print("Open IDs:", [t["id"] for t in open_list.data["items"]]) + + # Complete one + updated = await client.call_tool("complete_todo", {"todo_id": t2.data["id"]}) + print("Completed:", updated.data["id"], "status:", updated.data["status"]) + + # Search + found = await client.call_tool("search_todos", {"query": "readme"}) + print("Search 'readme':", [t["id"] for t in found.data["items"]]) + + # Resources + stats = await client.read_resource("stats://todos") + print("Stats:", getattr(stats[0], "text", None) or stats[0]) + + todo2 = await client.read_resource(f"todo://{t2.data['id']}") + print("todo://{id}:", getattr(todo2[0], "text", None) or todo2[0]) + + # Prompt + prompt_msgs = await client.get_prompt("suggest_next_action", {"pending": 2, "project": "MCP tutorial"}) + msgs_pretty = [ + {"role": m.role, "content": getattr(m, "content", None) or getattr(m, "text", None)} + for m in getattr(prompt_msgs, "messages", []) + ] + print("Prompt messages:", msgs_pretty) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/general/fastmcp-mcp-client-server-todo-manager/todo_server.py b/general/fastmcp-mcp-client-server-todo-manager/todo_server.py new file mode 100644 index 00000000..64f99b73 --- /dev/null +++ b/general/fastmcp-mcp-client-server-todo-manager/todo_server.py @@ -0,0 +1,88 @@ +from typing import Literal +from itertools import count +from datetime import datetime, timezone +from fastmcp import FastMCP + +# In-memory storage for demo purposes +TODOS: list[dict] = [] +_id = count(start=1) + +mcp = FastMCP(name="Todo Manager") + +@mcp.tool +def create_todo( + title: str, + description: str = "", + priority: Literal["low", "medium", "high"] = "medium", +) -> dict: + """Create a todo (id, title, status, priority, timestamps).""" + todo = { + "id": next(_id), + "title": title, + "description": description, + "priority": priority, + "status": "open", + "created_at": datetime.now(timezone.utc).isoformat(), + "completed_at": None, + } + TODOS.append(todo) + return todo + +@mcp.tool +def list_todos(status: Literal["open", "done", "all"] = "open") -> dict: + """List todos by status ('open' | 'done' | 'all').""" + if status == "all": + items = TODOS + elif status == "open": + items = [t for t in TODOS if t["status"] == "open"] + else: + items = [t for t in TODOS if t["status"] == "done"] + return {"items": items} + +@mcp.tool +def complete_todo(todo_id: int) -> dict: + """Mark a todo as done.""" + for t in TODOS: + if t["id"] == todo_id: + t["status"] = "done" + t["completed_at"] = datetime.now(timezone.utc).isoformat() + return t + raise ValueError(f"Todo {todo_id} not found") + +@mcp.tool +def search_todos(query: str) -> dict: + """Case-insensitive search in title/description.""" + q = query.lower().strip() + items = [t for t in TODOS if q in t["title"].lower() or q in t["description"].lower()] + return {"items": items} + +# Read-only resources +@mcp.resource("stats://todos") +def todo_stats() -> dict: + """Aggregated stats: total, open, done.""" + total = len(TODOS) + open_count = sum(1 for t in TODOS if t["status"] == "open") + done_count = total - open_count + return {"total": total, "open": open_count, "done": done_count} + +@mcp.resource("todo://{id}") +def get_todo(id: int) -> dict: + """Fetch a single todo by id.""" + for t in TODOS: + if t["id"] == id: + return t + raise ValueError(f"Todo {id} not found") + +# A reusable prompt +@mcp.prompt +def suggest_next_action(pending: int, project: str | None = None) -> str: + """Render a small instruction for an LLM to propose next action.""" + base = f"You have {pending} pending TODOs. " + if project: + base += f"They relate to the project '{project}'. " + base += "Suggest the most impactful next action in one short sentence." + return base + +if __name__ == "__main__": + # Default transport is stdio; you can also use transport="http", host=..., port=... + mcp.run() diff --git a/general/interactive-weather-plot/interactive_weather_plot.py b/general/interactive-weather-plot/interactive_weather_plot.py index b4d17141..3d1ea566 100644 --- a/general/interactive-weather-plot/interactive_weather_plot.py +++ b/general/interactive-weather-plot/interactive_weather_plot.py @@ -68,7 +68,7 @@ def changeLocation(newLocation): # Making the Radio Buttons buttons = RadioButtons( ax=plt.axes([0.1, 0.1, 0.2, 0.2]), - labels=locations.keys() + labels=list(locations.keys()) ) # Connect click event on the buttons to the function that changes location. @@ -86,4 +86,4 @@ def changeLocation(newLocation): plt.savefig('file.svg', format='svg') -plt.show() \ No newline at end of file +plt.show() diff --git a/gui-programming/rich-text-editor/rich_text_editor.py b/gui-programming/rich-text-editor/rich_text_editor.py index 10c14263..05259905 100644 --- a/gui-programming/rich-text-editor/rich_text_editor.py +++ b/gui-programming/rich-text-editor/rich_text_editor.py @@ -112,9 +112,9 @@ def fileManager(event=None, action=None): document['tags'][tagName] = [] ranges = textArea.tag_ranges(tagName) - - for i, tagRange in enumerate(ranges[::2]): - document['tags'][tagName].append([str(tagRange), str(ranges[i+1])]) + + for i in range(0, len(ranges), 2): + document['tags'][tagName].append([str(ranges[i]), str(ranges[i + 1])]) if not filePath: # ask the user for a filename with the native file explorer. diff --git a/handling-pdf-files/pdf-compressor/README.md b/handling-pdf-files/pdf-compressor/README.md index 4527174c..307f105c 100644 --- a/handling-pdf-files/pdf-compressor/README.md +++ b/handling-pdf-files/pdf-compressor/README.md @@ -1,8 +1,48 @@ # [How to Compress PDF Files in Python](https://www.thepythoncode.com/article/compress-pdf-files-in-python) -To run this: -- `pip3 install -r requirements.txt` -- To compress `bert-paper.pdf` file: - ``` - $ python pdf_compressor.py bert-paper.pdf bert-paper-min.pdf - ``` - This will spawn a new compressed PDF file under the name `bert-paper-min.pdf`. + +This directory contains two approaches: + +- Legacy (commercial): `pdf_compressor.py` uses PDFTron/PDFNet. PDFNet now requires a license key and the old pip package is not freely available, so this may not work without a license. +- Recommended (open source): `pdf_compressor_ghostscript.py` uses Ghostscript to compress PDFs. + +## Ghostscript method (recommended) + +Prerequisite: Install Ghostscript + +- macOS (Homebrew): + - `brew install ghostscript` +- Ubuntu/Debian: + - `sudo apt-get update && sudo apt-get install -y ghostscript` +- Windows: + - Download and install from https://ghostscript.com/releases/ + - Ensure `gswin64c.exe` (or `gswin32c.exe`) is in your PATH. + +No Python packages are required for this method, only Ghostscript. + +### Usage + +To compress `bert-paper.pdf` into `bert-paper-min.pdf` with default quality (`power=2`): + +``` +python pdf_compressor_ghostscript.py bert-paper.pdf bert-paper-min.pdf +``` + +Optional quality level `[power]` controls compression/quality tradeoff (maps to Ghostscript `-dPDFSETTINGS`): + +- 0 = `/screen` (smallest, lowest quality) +- 1 = `/ebook` (good quality) +- 2 = `/printer` (high quality) [default] +- 3 = `/prepress` (very high quality) +- 4 = `/default` (Ghostscript default) + +Example: + +``` +python pdf_compressor_ghostscript.py bert-paper.pdf bert-paper-min.pdf 1 +``` + +In testing, `bert-paper.pdf` (~757 KB) compressed to ~407 KB with `power=1`. + +## Legacy PDFNet method (requires license) + +If you have a valid license and the PDFNet SDK installed, you can use the original `pdf_compressor.py` script. Note that the previously referenced `PDFNetPython3` pip package is not freely available and may not install via pip. Refer to the vendor's documentation for installation and licensing. \ No newline at end of file diff --git a/handling-pdf-files/pdf-compressor/pdf_compressor_ghostscript.py b/handling-pdf-files/pdf-compressor/pdf_compressor_ghostscript.py new file mode 100644 index 00000000..88de4062 --- /dev/null +++ b/handling-pdf-files/pdf-compressor/pdf_compressor_ghostscript.py @@ -0,0 +1,103 @@ +import os +import sys +import subprocess +import shutil + + +def get_size_format(b, factor=1024, suffix="B"): + for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: + if b < factor: + return f"{b:.2f}{unit}{suffix}" + b /= factor + return f"{b:.2f}Y{suffix}" + + +def find_ghostscript_executable(): + candidates = [ + shutil.which('gs'), + shutil.which('gswin64c'), + shutil.which('gswin32c'), + ] + for c in candidates: + if c: + return c + return None + + +def compress_file(input_file: str, output_file: str, power: int = 2): + """Compress PDF using Ghostscript. + + power: + 0 -> /screen (lowest quality, highest compression) + 1 -> /ebook (good quality) + 2 -> /printer (high quality) [default] + 3 -> /prepress (very high quality) + 4 -> /default (Ghostscript default) + """ + if not os.path.exists(input_file): + raise FileNotFoundError(f"Input file not found: {input_file}") + if not output_file: + output_file = input_file + + initial_size = os.path.getsize(input_file) + + gs = find_ghostscript_executable() + if not gs: + raise RuntimeError( + "Ghostscript not found. Install it and ensure 'gs' (Linux/macOS) " + "or 'gswin64c'/'gswin32c' (Windows) is in PATH." + ) + + settings_map = { + 0: '/screen', + 1: '/ebook', + 2: '/printer', + 3: '/prepress', + 4: '/default', + } + pdfsettings = settings_map.get(power, '/printer') + + cmd = [ + gs, + '-sDEVICE=pdfwrite', + '-dCompatibilityLevel=1.4', + f'-dPDFSETTINGS={pdfsettings}', + '-dNOPAUSE', + '-dBATCH', + '-dQUIET', + f'-sOutputFile={output_file}', + input_file, + ] + + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError as e: + print(f"Ghostscript failed: {e}") + return False + + compressed_size = os.path.getsize(output_file) + ratio = 1 - (compressed_size / initial_size) + summary = { + "Input File": input_file, + "Initial Size": get_size_format(initial_size), + "Output File": output_file, + "Compressed Size": get_size_format(compressed_size), + "Compression Ratio": f"{ratio:.3%}", + } + + print("## Summary ########################################################") + for k, v in summary.items(): + print(f"{k}: {v}") + print("###################################################################") + return True + + +if __name__ == '__main__': + if len(sys.argv) < 3: + print("Usage: python pdf_compressor_ghostscript.py [power 0-4]") + sys.exit(1) + input_file = sys.argv[1] + output_file = sys.argv[2] + power = int(sys.argv[3]) if len(sys.argv) > 3 else 2 + ok = compress_file(input_file, output_file, power) + sys.exit(0 if ok else 2) \ No newline at end of file diff --git a/handling-pdf-files/pdf-compressor/requirements.txt b/handling-pdf-files/pdf-compressor/requirements.txt index 0a664a86..9f6e5337 100644 --- a/handling-pdf-files/pdf-compressor/requirements.txt +++ b/handling-pdf-files/pdf-compressor/requirements.txt @@ -1 +1,7 @@ -PDFNetPython3==8.1.0 \ No newline at end of file +# No Python dependencies required for Ghostscript-based compressor. +# System dependency: Ghostscript +# - macOS: brew install ghostscript +# - Debian: sudo apt-get install -y ghostscript +# - Windows: https://ghostscript.com/releases/ +# +# The legacy script (pdf_compressor.py) depends on PDFNet (commercial) and a license key. \ No newline at end of file diff --git a/images/codingfleet-banner-2.png b/images/codingfleet-banner-2.png new file mode 100644 index 00000000..e95c4d27 Binary files /dev/null and b/images/codingfleet-banner-2.png differ diff --git a/images/codingfleet-banner-3.png b/images/codingfleet-banner-3.png new file mode 100644 index 00000000..9f27495e Binary files /dev/null and b/images/codingfleet-banner-3.png differ diff --git a/web-scraping/youtube-extractor/extract_video_info.py b/web-scraping/youtube-extractor/extract_video_info.py index 042ce4f8..bed184b0 100644 --- a/web-scraping/youtube-extractor/extract_video_info.py +++ b/web-scraping/youtube-extractor/extract_video_info.py @@ -1,92 +1,150 @@ -from requests_html import HTMLSession -from bs4 import BeautifulSoup as bs +import requests +from bs4 import BeautifulSoup import re import json - -# init session -session = HTMLSession() - +import argparse def get_video_info(url): - # download HTML code - response = session.get(url) - # execute Javascript - response.html.render(timeout=60) - # create beautiful soup object to parse HTML - soup = bs(response.html.html, "html.parser") - # open("index.html", "w").write(response.html.html) - # initialize the result - result = {} - # video title - result["title"] = soup.find("meta", itemprop="name")['content'] - # video views - result["views"] = soup.find("meta", itemprop="interactionCount")['content'] - # video description - result["description"] = soup.find("meta", itemprop="description")['content'] - # date published - result["date_published"] = soup.find("meta", itemprop="datePublished")['content'] - # get the duration of the video - result["duration"] = soup.find("span", {"class": "ytp-time-duration"}).text - # get the video tags - result["tags"] = ', '.join([ meta.attrs.get("content") for meta in soup.find_all("meta", {"property": "og:video:tag"}) ]) - - # Additional video and channel information (with help from: https://stackoverflow.com/a/68262735) - data = re.search(r"var ytInitialData = ({.*?});", soup.prettify()).group(1) - data_json = json.loads(data) - videoPrimaryInfoRenderer = data_json['contents']['twoColumnWatchNextResults']['results']['results']['contents'][0]['videoPrimaryInfoRenderer'] - videoSecondaryInfoRenderer = data_json['contents']['twoColumnWatchNextResults']['results']['results']['contents'][1]['videoSecondaryInfoRenderer'] - # number of likes - likes_label = videoPrimaryInfoRenderer['videoActions']['menuRenderer']['topLevelButtons'][0]['toggleButtonRenderer']['defaultText']['accessibility']['accessibilityData']['label'] # "No likes" or "###,### likes" - likes_str = likes_label.split(' ')[0].replace(',','') - result["likes"] = '0' if likes_str == 'No' else likes_str - # number of likes (old way) doesn't always work - # text_yt_formatted_strings = soup.find_all("yt-formatted-string", {"id": "text", "class": "ytd-toggle-button-renderer"}) - # result["likes"] = ''.join([ c for c in text_yt_formatted_strings[0].attrs.get("aria-label") if c.isdigit() ]) - # result["likes"] = 0 if result['likes'] == '' else int(result['likes']) - # number of dislikes - YouTube does not publish this anymore... - # result["dislikes"] = ''.join([ c for c in text_yt_formatted_strings[1].attrs.get("aria-label") if c.isdigit() ]) - # result["dislikes"] = '0' if result['dislikes'] == '' else result['dislikes'] - result['dislikes'] = 'UNKNOWN' - # channel details - channel_tag = soup.find("meta", itemprop="channelId")['content'] - # channel name - channel_name = soup.find("span", itemprop="author").next.next['content'] - # channel URL - # channel_url = soup.find("span", itemprop="author").next['href'] - channel_url = f"https://www.youtube.com/%7Bchannel_tag%7D" - # number of subscribers as str - channel_subscribers = videoSecondaryInfoRenderer['owner']['videoOwnerRenderer']['subscriberCountText']['accessibility']['accessibilityData']['label'] - # channel details (old way) - # channel_tag = soup.find("yt-formatted-string", {"class": "ytd-channel-name"}).find("a") - # # channel name (old way) - # channel_name = channel_tag.text - # # channel URL (old way) - # channel_url = f"https://www.youtube.com{channel_tag['href']}" - # number of subscribers as str (old way) - # channel_subscribers = soup.find("yt-formatted-string", {"id": "owner-sub-count"}).text.strip() - result['channel'] = {'name': channel_name, 'url': channel_url, 'subscribers': channel_subscribers} - return result + """ + Extract video information from YouTube using modern approach + """ + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + + try: + # Download HTML code + response = requests.get(url, headers=headers) + response.raise_for_status() + + # Create beautiful soup object to parse HTML + soup = BeautifulSoup(response.text, "html.parser") + + # Initialize the result + result = {} + + # Extract ytInitialData which contains all the video information + data_match = re.search(r'var ytInitialData = ({.*?});', response.text) + if not data_match: + raise Exception("Could not find ytInitialData in page") + + data_json = json.loads(data_match.group(1)) + + # Get the main content sections + contents = data_json['contents']['twoColumnWatchNextResults']['results']['results']['contents'] + + # Extract video information from videoPrimaryInfoRenderer + if 'videoPrimaryInfoRenderer' in contents[0]: + primary = contents[0]['videoPrimaryInfoRenderer'] + + # Video title + result["title"] = primary['title']['runs'][0]['text'] + + # Video views + result["views"] = primary['viewCount']['videoViewCountRenderer']['viewCount']['simpleText'] + + # Date published + result["date_published"] = primary['dateText']['simpleText'] + + # Extract channel information from videoSecondaryInfoRenderer + secondary = None + if 'videoSecondaryInfoRenderer' in contents[1]: + secondary = contents[1]['videoSecondaryInfoRenderer'] + owner = secondary['owner']['videoOwnerRenderer'] + + # Channel name + channel_name = owner['title']['runs'][0]['text'] + + # Channel ID + channel_id = owner['navigationEndpoint']['browseEndpoint']['browseId'] + + # Channel URL - FIXED with proper /channel/ path + channel_url = f"https://www.youtube.com/channel/%7Bchannel_id%7D" + + # Number of subscribers + channel_subscribers = owner['subscriberCountText']['accessibility']['accessibilityData']['label'] + + result['channel'] = { + 'name': channel_name, + 'url': channel_url, + 'subscribers': channel_subscribers + } + + # Extract video description + if secondary and 'attributedDescription' in secondary: + description_runs = secondary['attributedDescription']['content'] + result["description"] = description_runs + else: + result["description"] = "Description not available" + + # Try to extract video duration from player overlay + # This is a fallback approach since the original method doesn't work + duration_match = re.search(r'"approxDurationMs":"(\d+)"', response.text) + if duration_match: + duration_ms = int(duration_match.group(1)) + minutes = duration_ms // 60000 + seconds = (duration_ms % 60000) // 1000 + result["duration"] = f"{minutes}:{seconds:02d}" + else: + result["duration"] = "Duration not available" + + # Extract video tags if available + video_tags = [] + if 'keywords' in data_json.get('metadata', {}).get('videoMetadataRenderer', {}): + video_tags = data_json['metadata']['videoMetadataRenderer']['keywords'] + result["tags"] = ', '.join(video_tags) if video_tags else "No tags available" + + # Extract likes (modern approach) + result["likes"] = "Likes count not available" + result["dislikes"] = "UNKNOWN" # YouTube no longer shows dislikes + + # Try to find likes in the new structure + for content in contents: + if 'compositeVideoPrimaryInfoRenderer' in content: + composite = content['compositeVideoPrimaryInfoRenderer'] + if 'likeButton' in composite: + like_button = composite['likeButton'] + if 'toggleButtonRenderer' in like_button: + toggle = like_button['toggleButtonRenderer'] + if 'defaultText' in toggle: + default_text = toggle['defaultText'] + if 'accessibility' in default_text: + accessibility = default_text['accessibility'] + if 'accessibilityData' in accessibility: + label = accessibility['accessibilityData']['label'] + if 'like' in label.lower(): + result["likes"] = label + + return result + + except Exception as e: + raise Exception(f"Error extracting video info: {str(e)}") if __name__ == "__main__": - import argparse parser = argparse.ArgumentParser(description="YouTube Video Data Extractor") parser.add_argument("url", help="URL of the YouTube video") args = parser.parse_args() + # parse the video URL from command line url = args.url - data = get_video_info(url) + try: + data = get_video_info(url) - # print in nice format - print(f"Title: {data['title']}") - print(f"Views: {data['views']}") - print(f"Published at: {data['date_published']}") - print(f"Video Duration: {data['duration']}") - print(f"Video tags: {data['tags']}") - print(f"Likes: {data['likes']}") - print(f"Dislikes: {data['dislikes']}") - print(f"\nDescription: {data['description']}\n") - print(f"\nChannel Name: {data['channel']['name']}") - print(f"Channel URL: {data['channel']['url']}") - print(f"Channel Subscribers: {data['channel']['subscribers']}") + # print in nice format + print(f"Title: {data['title']}") + print(f"Views: {data['views']}") + print(f"Published at: {data['date_published']}") + print(f"Video Duration: {data['duration']}") + print(f"Video tags: {data['tags']}") + print(f"Likes: {data['likes']}") + print(f"Dislikes: {data['dislikes']}") + print(f"\nDescription: {data['description']}\n") + print(f"\nChannel Name: {data['channel']['name']}") + print(f"Channel URL: {data['channel']['url']}") + print(f"Channel Subscribers: {data['channel']['subscribers']}") + + except Exception as e: + print(f"Error: {e}") + print("\nNote: YouTube frequently changes its structure, so this script may need updates.") \ No newline at end of file diff --git a/web-scraping/youtube-transcript-summarizer/youtube_transcript_summarizer.py b/web-scraping/youtube-transcript-summarizer/youtube_transcript_summarizer.py index 6d4983ef..bdb80f54 100644 --- a/web-scraping/youtube-transcript-summarizer/youtube_transcript_summarizer.py +++ b/web-scraping/youtube-transcript-summarizer/youtube_transcript_summarizer.py @@ -1,8 +1,7 @@ import os -import re +import sys import nltk import pytube -import youtube_transcript_api from youtube_transcript_api import YouTubeTranscriptApi from nltk.corpus import stopwords from nltk.tokenize import sent_tokenize, word_tokenize @@ -21,11 +20,17 @@ nltk.download('punkt', quiet=True) nltk.download('stopwords', quiet=True) -# Initialize OpenAI client -client = OpenAI( - base_url="https://openrouter.ai/api/v1", - api_key="", # Add your OpenRouter API key here -) +# Initialize OpenAI client from environment variable +# Expect the OpenRouter API key to be provided via OPENROUTER_API_KEY +api_key = os.getenv("OPENROUTER_API_KEY") +if not api_key: + print(Fore.RED + "Error: OPENROUTER_API_KEY environment variable is not set or is still the placeholder ('').") + sys.exit(1) +else: + client = OpenAI( + base_url="https://openrouter.ai/api/v1", + api_key=api_key, + ) def extract_video_id(youtube_url): """Extract the video ID from a YouTube URL.""" @@ -48,8 +53,10 @@ def extract_video_id(youtube_url): def get_transcript(video_id): """Get the transcript of a YouTube video.""" try: - transcript = YouTubeTranscriptApi.get_transcript(video_id) - return ' '.join([entry['text'] for entry in transcript]) + youtube_transcript_api = YouTubeTranscriptApi() + fetched_transcript = youtube_transcript_api.fetch(video_id) + full_transcript = " ".join([snippet.text for snippet in fetched_transcript.snippets]) + return full_transcript.strip() except Exception as e: return f"Error retrieving transcript: {str(e)}."