Created
June 1, 2024 01:27
-
-
Save VidFerris/e90c6cde98ef3f203843edf176ce1449 to your computer and use it in GitHub Desktop.
Python code to download curated Beat Saber maps from Beat Saver, recording seen hashes to prevent redownloading.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import os | |
import zipfile | |
import re | |
import sys | |
from datetime import datetime | |
# Function to fetch data from the API endpoint with optional "before" parameter | |
def fetch_data_from_api(url, page_size, before=None): | |
if before: | |
url = f"{url}&pageSize={page_size}&before={before}" | |
else: | |
url = f"{url}&pageSize={page_size}" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() # Raise an error for bad status codes | |
print("API response received.") | |
return response.json() | |
except requests.RequestException as e: | |
log_error(f"Failed to fetch data. Error: {str(e)}") | |
return None | |
# Function to download file from a URL with retry logic | |
def download_file(url, filename): | |
try: | |
response = requests.get(url, stream=True, timeout=10) | |
response.raise_for_status() # Raise an error for bad status codes | |
with open(filename, 'wb') as f: | |
for chunk in response.iter_content(1024): | |
f.write(chunk) | |
print(f"Downloaded: {filename}") | |
except requests.RequestException as e: | |
log_error(f"First attempt failed to download file from {url}. Error: {str(e)}") | |
try: | |
response = requests.get(url, stream=True, timeout=10) | |
response.raise_for_status() # Raise an error for bad status codes | |
with open(filename, 'wb') as f: | |
for chunk in response.iter_content(1024): | |
f.write(chunk) | |
print(f"Downloaded: {filename}") | |
except requests.RequestException as e: | |
log_error(f"Second attempt failed to download file from {url}. Filename: {filename}. Error: {str(e)}") | |
# Function to unzip a file into a specified directory | |
def unzip_file(zip_path, extract_to): | |
try: | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(extract_to) | |
print(f"Unzipped: {zip_path} to {extract_to}") | |
except zipfile.BadZipFile as e: | |
log_error(f"Failed to unzip file {zip_path}. Error: {str(e)}") | |
# Function to sanitize folder names | |
def sanitize_filename(name): | |
# Replace invalid characters with an underscore | |
return re.sub(r'[<>:"/\\|?*]', '_', name) | |
# Function to log errors to a log file | |
def log_error(message): | |
with open("log.txt", "a") as log_file: | |
log_file.write(f"[{datetime.now()}] {message}\n") | |
# Main function to process the data | |
def main(max_downloads=1000): | |
api_url = 'https://api.beatsaver.com/maps/latest?sort=CURATED' | |
seen_hashes_file = 'seen_hashes.json' | |
if os.path.exists(seen_hashes_file): | |
with open(seen_hashes_file, 'r') as f: | |
seen_hashes = set(json.load(f)) | |
else: | |
seen_hashes = set() | |
new_hashes = set() | |
total_downloads = 0 | |
before = None | |
encountered_existing_hash = False | |
while total_downloads < max_downloads and not encountered_existing_hash: | |
remaining_downloads = max_downloads - total_downloads | |
page_size = min(100, remaining_downloads) | |
data = fetch_data_from_api(api_url, page_size, before) | |
if data is None or not data.get('docs'): | |
break | |
matches_existing = 0 | |
will_download = 0 | |
for doc in data['docs']: | |
map_id = doc.get('id') | |
map_name = doc.get('name') | |
sanitized_map_name = sanitize_filename(map_name) | |
folder_name = f"{map_id} ({sanitized_map_name})" | |
for version in doc.get('versions', []): | |
version_hash = version.get('hash') | |
download_url = version.get('downloadURL') | |
if version_hash in seen_hashes: | |
encountered_existing_hash = True | |
break | |
will_download += 1 | |
if encountered_existing_hash: | |
break | |
if encountered_existing_hash: | |
break | |
remaining_downloads = max_downloads - total_downloads | |
if will_download > remaining_downloads: | |
will_download = remaining_downloads | |
print(f"Number of maps matching existing hashes: {matches_existing}") | |
print(f"Number of maps to be downloaded: {will_download}") | |
downloaded_count = 0 | |
for doc in data['docs']: | |
if downloaded_count >= will_download: | |
break | |
map_id = doc.get('id') | |
map_name = doc.get('name') | |
sanitized_map_name = sanitize_filename(map_name) | |
folder_name = f"{map_id} ({sanitized_map_name})" | |
for version in doc.get('versions', []): | |
if downloaded_count >= will_download: | |
break | |
version_hash = version.get('hash') | |
download_url = version.get('downloadURL') | |
if version_hash in seen_hashes: | |
encountered_existing_hash = True | |
break | |
print(f"Downloading: {folder_name}") | |
filename = f"{version_hash}.zip" | |
download_file(download_url, filename) | |
unzip_file(filename, folder_name) | |
try: | |
os.remove(filename) # Remove the zip file after extraction | |
except OSError as e: | |
log_error(f"Failed to remove file {filename}. Error: {str(e)}") | |
new_hashes.add(version_hash) | |
total_downloads += 1 | |
downloaded_count += 1 | |
if encountered_existing_hash or total_downloads >= max_downloads: | |
break | |
# Determine the oldest curatedAt date for the next "before" parameter | |
if total_downloads < max_downloads and not encountered_existing_hash: | |
oldest_curated_at = min(doc['curatedAt'] for doc in data['docs']) | |
before = oldest_curated_at | |
# Update the seen hashes file | |
seen_hashes.update(new_hashes) | |
with open(seen_hashes_file, 'w') as f: | |
json.dump(list(seen_hashes), f) | |
if __name__ == "__main__": | |
if len(sys.argv) == 1: | |
max_downloads = 100 | |
elif len(sys.argv) != 2: | |
print("Usage: python BeatSyncReplace.py <max_downloads>") | |
sys.exit(1) | |
else: | |
max_downloads = int(sys.argv[1]) | |
main(max_downloads) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment