better packaging, clean stdout

This commit is contained in:
Somdev Sangwan
2026-05-02 00:17:15 +05:30
parent 5be7638d5e
commit da32aa5aed
7 changed files with 136 additions and 68 deletions
+5
View File
@@ -0,0 +1,5 @@
build/
dist/
*.egg-info/
__pycache__/
*.py[cod]
+10 -7
View File
@@ -80,9 +80,9 @@ docker compose run --rm wappalyzer -i https://example.com -oJ output.json
## For Users
Some common usage examples are given below, refer to list of all options for more information.
- Scan a single URL:
`wappalyzer -i https://example.com`
- Scan multiple URLs from a file: `wappalyzer -i urls.txt -t 10`
- Scan a single URL: `wappalyzer -i https://example.com`
- Scan multiple URLs from a file: `wappalyzer -i urls.txt -w 10`
- Set page-load timeout for full scans: `wappalyzer -i urls.txt -t 15`
- Scan with authentication: `wappalyzer -i https://example.com -c "sessionid=abc123; token=xyz789"`
- Export results to JSON: `wappalyzer -i https://example.com -oJ results.json`
@@ -95,7 +95,8 @@ Some common usage examples are given below, refer to list of all options for mor
- `fast`: Quick HTTP-based scan (sends 1 request)
- `balanced`: HTTP-based scan with more requests
- `full`: Complete scan using wappalyzer extension
- `-t, --threads`: Number of concurrent threads (default: 5)
- `-w, --workers`: Number of concurrent workers (default: 5; full scans are capped at 3)
- `-t, --timeout`: Maximum seconds to wait for a page load in full scans (default: 30)
- `-oJ`: JSON output file path
- `-oC`: CSV output file path
- `-oH`: HTML output file path
@@ -119,8 +120,9 @@ results = analyze('https://example.com')
results = analyze(
url='https://example.com',
scan_type='full', # 'fast', 'balanced', or 'full'
threads=3,
cookie='sessionid=abc123'
workers=3,
cookie='sessionid=abc123',
timeout=30
)
```
@@ -131,8 +133,9 @@ results = analyze(
- `'fast'`: Quick HTTP-based scan
- `'balanced'`: HTTP-based scan with more requests
- `'full'`: Complete scan including JavaScript execution (default)
- `threads` (int, optional): Number of threads for parallel processing (default: 3)
- `workers` (int, optional): Number of concurrent workers for parallel processing (default: 3)
- `cookie` (str, optional): Cookie header string for authenticated scans
- `timeout` (int, optional): Maximum seconds to wait for a page load in full scans (default: 30)
#### Return Value
+3
View File
@@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools>=61", "wheel"]
build-backend = "setuptools.build_meta"
+12 -3
View File
@@ -10,7 +10,7 @@ with io.open(path.join(this_directory, 'README.md'), encoding='utf-8') as f:
setup(
name='wappalyzer',
version='1.0.23',
version='2.0.0',
description='Wappalyzer-based tech stack detection library',
long_description=desc,
long_description_content_type='text/markdown',
@@ -18,11 +18,12 @@ setup(
author_email='s0md3v@gmail.com',
license='GNU General Public License v3',
url='https://github.com/s0md3v/wappalyzer-next',
download_url='https://github.com/s0md3v/wappalyzer-next/archive/1.0.23.zip',
packages=find_packages(),
package_data={'wappalyzer': ['data/*']},
python_requires='>=3.9',
install_requires=[
'requests',
'urllib3',
'huepy',
'selenium',
'tldextract',
@@ -35,7 +36,15 @@ setup(
'Intended Audience :: Information Technology',
'Operating System :: OS Independent',
'Topic :: Security',
'Programming Language :: Python :: 3.4',
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13',
'Programming Language :: Python :: 3.14',
],
entry_points={
'console_scripts': [
+95 -49
View File
@@ -1,6 +1,7 @@
import argparse
import queue
import re
import sys
import threading
import tldextract
@@ -11,7 +12,7 @@ from wappalyzer.core.analyzer import http_scan
from wappalyzer.core.utils import pretty_print, write_to_file
from wappalyzer.browser.analyzer import DriverPool, cookie_to_cookies, process_url, merge_technologies
def analyze(url, scan_type='full', threads=3, cookie=None, timeout=30):
def analyze(url, scan_type='full', workers=3, cookie=None, timeout=30):
"""Analyze a single URL"""
if scan_type.lower() == 'full':
driver_pool = None
@@ -23,32 +24,32 @@ def analyze(url, scan_type='full', threads=3, cookie=None, timeout=30):
driver.add_cookie(cookie_dict)
url, detections = process_url(driver, url)
return {url: merge_technologies(detections)}
finally:
if driver_pool:
try:
driver_pool.cleanup()
except Exception as e:
print(f"Error during final cleanup: {str(e)}")
finally:
if driver_pool:
try:
driver_pool.cleanup()
except Exception as e:
print(f"Error during final cleanup: {str(e)}", file=sys.stderr)
return {url: http_scan(url, scan_type, cookie)}
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-i', help='import from file or enter a url', dest='input_file')
parser.add_argument('--scan-type', help='fast, balanced or full', dest='scan_type', default='full', type=str.lower)
parser.add_argument('-t', '--threads', help='number of threads', dest='thread_num', default=5, type=int)
parser.add_argument('-w', '--workers', help='number of concurrent workers', dest='worker_num', default=5, type=int)
parser.add_argument('-oJ', help='json output file', dest='json_output_file')
parser.add_argument('-oC', help='csv output file', dest='csv_output_file')
parser.add_argument('-oH', help='html output file', dest='html_output_file')
parser.add_argument('-c', '--cookie', help='cookie string', dest='cookie')
parser.add_argument('--timeout', help='maximum seconds to wait for a page load in full scans', dest='timeout', default=30, type=int)
parser.add_argument('-t', '--timeout', help='maximum seconds to wait for a page load in full scans', dest='timeout', default=30, type=int)
args = parser.parse_args()
print('\n\t' + bold(green('wappalyzer')) + '\n')
print('\n\t' + bold(green('wappalyzer')) + '\n', file=sys.stderr)
if not args.input_file:
parser.print_help()
parser.print_help(file=sys.stderr)
exit(22)
def process_urls(urls, num_threads=3, cookie=None, scan_type='full', should_print=False, timeout=30):
def process_urls(urls, num_workers=3, cookie=None, scan_type='full', should_print=False, timeout=30):
"""Process multiple URLs using a driver pool"""
results = {}
driver_pool = None
@@ -66,8 +67,8 @@ def main():
url = url_queue.get_nowait()
except queue.Empty:
break
print(f"Processing: {url}")
detections = None
try:
if scan_type == 'full':
with driver_pool.get_driver() as driver:
@@ -75,22 +76,19 @@ def main():
for cookie_dict in cookie_to_cookies(cookie):
driver.add_cookie(cookie_dict)
url, detections = process_url(driver, url)
if detections:
with lock:
result_queue.put((url, detections))
else:
detections = http_scan(url, scan_type, cookie)
with lock:
result_queue.put((url, detections))
except Exception as e:
print(f"Error processing: {url}")
print(f"Error processing: {url}", file=sys.stderr)
finally:
with lock:
result_queue.put((url, detections))
url_queue.task_done()
except Exception as e:
print(f"Worker {worker_id} encountered an error: {str(e)}")
print(f"Worker {worker_id} encountered an error: {str(e)}", file=sys.stderr)
try:
worker_count = min(num_threads, 3, len(urls)) if scan_type == 'full' else min(num_threads, len(urls))
worker_count = min(num_workers, 3, len(urls)) if scan_type == 'full' else min(num_workers, len(urls))
driver_pool = DriverPool(size=worker_count, timeout=timeout) if scan_type == 'full' else None # Limit max concurrent drivers
url_queue = Queue()
@@ -108,54 +106,99 @@ def main():
)
thread.start()
threads.append(thread)
# Wait for all tasks to complete or interruption
def clear_status_line():
if should_print:
print('\r\033[K', end='', file=sys.stderr, flush=True)
def print_status(processed_count):
if should_print:
print(
f'\r\033[KProcessed {processed_count}/{len(urls)} URLs',
end='',
file=sys.stderr,
flush=True,
)
def print_finished_result(url, detections):
if not detections:
return
clear_status_line()
if scan_type == 'full':
pretty_print({url: merge_technologies(detections)})
else:
pretty_print({url: detections})
processed_count = 0
print_status(processed_count)
try:
url_queue.join()
while processed_count < len(urls):
try:
url, detections = result_queue.get(timeout=0.1)
except queue.Empty:
if all(not thread.is_alive() for thread in threads):
break
continue
processed_count += 1
if scan_type == 'full':
merged = merge_technologies(detections) if detections else {}
if merged:
results[url] = merged
else:
detections = detections or {}
results[url] = detections
if should_print:
print_finished_result(url, detections)
print_status(processed_count)
result_queue.task_done()
except KeyboardInterrupt:
interrupted = True
print("\nInterrupted! Saving partial results...")
if should_print:
clear_status_line()
print("\nInterrupted! Saving partial results...", file=sys.stderr)
for thread in threads:
thread.join()
# Process available results
while not result_queue.empty():
url, detections = result_queue.get()
if should_print:
if scan_type == 'full':
pretty_print({url: merge_technologies(detections)})
else:
pretty_print({url: detections})
processed_count += 1
if scan_type == 'full':
results[url] = merge_technologies(detections)
merged = merge_technologies(detections) if detections else {}
if merged:
results[url] = merged
else:
detections = detections or {}
results[url] = detections
result_queue.task_done()
if should_print:
print_status(processed_count)
sys.stderr.write('\n')
sys.stderr.flush()
return results
except Exception as e:
print(f"Error in process_urls: {str(e)}")
print(f"Error in process_urls: {str(e)}", file=sys.stderr)
return results
finally:
if driver_pool:
try:
driver_pool.cleanup()
except Exception as e:
print(f"Error during final cleanup: {str(e)}")
# Try forceful cleanup if regular cleanup fails
try:
import psutil
for proc in psutil.process_iter(['name']):
if 'firefox' in proc.info['name'].lower():
proc.kill()
except Exception:
pass
print(f"Error during final cleanup: {str(e)}", file=sys.stderr)
try:
if re.search(r'^https?://', args.input_file.lower()):
should_print = not (args.json_output_file or args.csv_output_file or args.html_output_file)
result = analyze(args.input_file, args.scan_type, args.thread_num, args.cookie, args.timeout)
result = analyze(args.input_file, args.scan_type, args.worker_num, args.cookie, args.timeout)
if should_print:
pretty_print(result)
else:
@@ -164,19 +207,22 @@ def main():
urls = urls_file.read().splitlines()
urls_file.close()
should_print = not (args.json_output_file or args.csv_output_file or args.html_output_file)
result = process_urls(urls, args.thread_num, args.cookie, args.scan_type, should_print=should_print, timeout=args.timeout)
result = process_urls(urls, args.worker_num, args.cookie, args.scan_type, should_print=should_print, timeout=args.timeout)
except FileNotFoundError:
if tldextract.extract('http://' + args.input_file).domain != '':
should_print = not (args.json_output_file or args.csv_output_file or args.html_output_file)
result = analyze('http://' + args.input_file, args.scan_type, args.thread_num, args.cookie, args.timeout)
result = analyze('http://' + args.input_file, args.scan_type, args.worker_num, args.cookie, args.timeout)
if should_print:
pretty_print(result)
else:
print(f"The argument '{args.input_file}' is neither a valid URL nor a file path.")
print(
f"The argument '{args.input_file}' is neither a valid URL nor a file path.",
file=sys.stderr,
)
exit(22)
except KeyboardInterrupt:
print("\nProgram interrupted by user. Saving partial results...")
print("\nProgram interrupted by user. Saving partial results...", file=sys.stderr)
pass
if 'result' in locals():
+8 -7
View File
@@ -1,6 +1,7 @@
import os
import json
import re
import sys
import time
import threading
import concurrent.futures
@@ -382,7 +383,7 @@ def _get_detections_for_current_tab(driver, target_url):
if isinstance(detections, dict):
if detections.get("error"):
print(f"Wappalyzer extension error: {detections['error']}")
print(f"Wappalyzer extension error: {detections['error']}", file=sys.stderr)
detections = detections.get("detections", [])
@@ -506,7 +507,7 @@ class DriverPool:
if driver:
self.pool.put(driver)
except Exception as e:
print(f"Failed to initialize driver: {str(e)}")
print(f"Failed to initialize driver: {str(e)}", file=sys.stderr)
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=size) as executor:
futures = [executor.submit(self._create_driver) for _ in range(size)]
@@ -525,7 +526,7 @@ class DriverPool:
if should_quit:
_quit_driver(driver)
except Exception as e:
print(f"Failed to initialize driver: {str(e)}")
print(f"Failed to initialize driver: {str(e)}", file=sys.stderr)
def _create_driver(self):
"""Create a new Firefox driver with retry logic"""
@@ -571,7 +572,7 @@ class DriverPool:
_get_extension_uuid(driver)
return driver
except Exception as e:
print(f"Attempt {attempt + 1} failed: {str(e)}")
print(f"Attempt {attempt + 1} failed: {str(e)}", file=sys.stderr)
time.sleep(1)
return None
@@ -584,7 +585,7 @@ class DriverPool:
driver = self.pool.get(timeout=30) # Wait up to 30 seconds for a driver
yield driver
except Exception as e:
print(f"Error with driver: {str(e)}")
print(f"Error with driver: {str(e)}", file=sys.stderr)
if driver:
try:
_quit_driver(driver) # Ensure driver is quit on error
@@ -656,7 +657,7 @@ class DriverPool:
except Empty: # Use Empty directly
break
except Exception as e:
print(f"Error during cleanup: {str(e)}")
print(f"Error during cleanup: {str(e)}", file=sys.stderr)
def quit_driver(driver):
try:
@@ -710,7 +711,7 @@ def process_url(driver, url):
return url, _get_detections_for_current_tab(driver, driver.current_url)
except Exception as e:
print(f"Error processing: {url}")
print(f"Error processing: {url}", file=sys.stderr)
return url, []
finally:
popup_handle = getattr(driver, "_wappalyzer_popup_handle", None)
+3 -2
View File
@@ -1,4 +1,5 @@
import requests
import sys
import urllib3
from wappalyzer.core.config import config
@@ -27,5 +28,5 @@ def get_response(url, cookie=None, **kwargs):
response = requests.get(url, headers=headers, verify=False, **kwargs)
return response
except requests.exceptions.RequestException as e:
print(e)
return None
print(e, file=sys.stderr)
return None