Skip to content

Instantly share code, notes, and snippets.

@VidFerris
Created June 1, 2024 01:27
Show Gist options
  • Save VidFerris/e90c6cde98ef3f203843edf176ce1449 to your computer and use it in GitHub Desktop.
Save VidFerris/e90c6cde98ef3f203843edf176ce1449 to your computer and use it in GitHub Desktop.
Python code to download curated Beat Saber maps from Beat Saver, recording seen hashes to prevent redownloading.
import requests
import json
import os
import zipfile
import re
import sys
from datetime import datetime
# Function to fetch data from the API endpoint with optional "before" parameter
def fetch_data_from_api(url, page_size, before=None):
if before:
url = f"{url}&pageSize={page_size}&before={before}"
else:
url = f"{url}&pageSize={page_size}"
try:
response = requests.get(url)
response.raise_for_status() # Raise an error for bad status codes
print("API response received.")
return response.json()
except requests.RequestException as e:
log_error(f"Failed to fetch data. Error: {str(e)}")
return None
# Function to download file from a URL with retry logic
def download_file(url, filename):
try:
response = requests.get(url, stream=True, timeout=10)
response.raise_for_status() # Raise an error for bad status codes
with open(filename, 'wb') as f:
for chunk in response.iter_content(1024):
f.write(chunk)
print(f"Downloaded: {filename}")
except requests.RequestException as e:
log_error(f"First attempt failed to download file from {url}. Error: {str(e)}")
try:
response = requests.get(url, stream=True, timeout=10)
response.raise_for_status() # Raise an error for bad status codes
with open(filename, 'wb') as f:
for chunk in response.iter_content(1024):
f.write(chunk)
print(f"Downloaded: {filename}")
except requests.RequestException as e:
log_error(f"Second attempt failed to download file from {url}. Filename: {filename}. Error: {str(e)}")
# Function to unzip a file into a specified directory
def unzip_file(zip_path, extract_to):
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
print(f"Unzipped: {zip_path} to {extract_to}")
except zipfile.BadZipFile as e:
log_error(f"Failed to unzip file {zip_path}. Error: {str(e)}")
# Function to sanitize folder names
def sanitize_filename(name):
# Replace invalid characters with an underscore
return re.sub(r'[<>:"/\\|?*]', '_', name)
# Function to log errors to a log file
def log_error(message):
with open("log.txt", "a") as log_file:
log_file.write(f"[{datetime.now()}] {message}\n")
# Main function to process the data
def main(max_downloads=1000):
api_url = 'https://api.beatsaver.com/maps/latest?sort=CURATED'
seen_hashes_file = 'seen_hashes.json'
if os.path.exists(seen_hashes_file):
with open(seen_hashes_file, 'r') as f:
seen_hashes = set(json.load(f))
else:
seen_hashes = set()
new_hashes = set()
total_downloads = 0
before = None
encountered_existing_hash = False
while total_downloads < max_downloads and not encountered_existing_hash:
remaining_downloads = max_downloads - total_downloads
page_size = min(100, remaining_downloads)
data = fetch_data_from_api(api_url, page_size, before)
if data is None or not data.get('docs'):
break
matches_existing = 0
will_download = 0
for doc in data['docs']:
map_id = doc.get('id')
map_name = doc.get('name')
sanitized_map_name = sanitize_filename(map_name)
folder_name = f"{map_id} ({sanitized_map_name})"
for version in doc.get('versions', []):
version_hash = version.get('hash')
download_url = version.get('downloadURL')
if version_hash in seen_hashes:
encountered_existing_hash = True
break
will_download += 1
if encountered_existing_hash:
break
if encountered_existing_hash:
break
remaining_downloads = max_downloads - total_downloads
if will_download > remaining_downloads:
will_download = remaining_downloads
print(f"Number of maps matching existing hashes: {matches_existing}")
print(f"Number of maps to be downloaded: {will_download}")
downloaded_count = 0
for doc in data['docs']:
if downloaded_count >= will_download:
break
map_id = doc.get('id')
map_name = doc.get('name')
sanitized_map_name = sanitize_filename(map_name)
folder_name = f"{map_id} ({sanitized_map_name})"
for version in doc.get('versions', []):
if downloaded_count >= will_download:
break
version_hash = version.get('hash')
download_url = version.get('downloadURL')
if version_hash in seen_hashes:
encountered_existing_hash = True
break
print(f"Downloading: {folder_name}")
filename = f"{version_hash}.zip"
download_file(download_url, filename)
unzip_file(filename, folder_name)
try:
os.remove(filename) # Remove the zip file after extraction
except OSError as e:
log_error(f"Failed to remove file {filename}. Error: {str(e)}")
new_hashes.add(version_hash)
total_downloads += 1
downloaded_count += 1
if encountered_existing_hash or total_downloads >= max_downloads:
break
# Determine the oldest curatedAt date for the next "before" parameter
if total_downloads < max_downloads and not encountered_existing_hash:
oldest_curated_at = min(doc['curatedAt'] for doc in data['docs'])
before = oldest_curated_at
# Update the seen hashes file
seen_hashes.update(new_hashes)
with open(seen_hashes_file, 'w') as f:
json.dump(list(seen_hashes), f)
if __name__ == "__main__":
if len(sys.argv) == 1:
max_downloads = 100
elif len(sys.argv) != 2:
print("Usage: python BeatSyncReplace.py <max_downloads>")
sys.exit(1)
else:
max_downloads = int(sys.argv[1])
main(max_downloads)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment