You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
598 lines
19 KiB
598 lines
19 KiB
import json |
|
import os |
|
import re |
|
import shutil |
|
import subprocess |
|
import sys |
|
import timeit |
|
from copy import deepcopy |
|
from typing import Literal, NotRequired, Optional, TypedDict |
|
|
|
import requests |
|
import yaml |
|
from semver import Version |
|
|
|
# Get TMP_DIR variable from environment |
|
TMP_DIR = os.path.join(os.environ.get("TMP_DIR", "/tmp"), "ohmyzsh") |
|
# Relative path to dependencies.yml file |
|
DEPS_YAML_FILE = ".github/dependencies.yml" |
|
# Dry run flag |
|
DRY_RUN = os.environ.get("DRY_RUN", "0") == "1" |
|
|
|
# utils for tag comparison |
|
BASEVERSION = re.compile( |
|
r"""[vV]? |
|
(?P<major>(0|[1-9])\d*) |
|
(\. |
|
(?P<minor>(0|[1-9])\d*) |
|
(\. |
|
(?P<patch>(0|[1-9])\d*) |
|
)? |
|
)? |
|
""", |
|
re.VERBOSE, |
|
) |
|
|
|
|
|
def coerce(version: str) -> Optional[Version]: |
|
match = BASEVERSION.search(version) |
|
if not match: |
|
return None |
|
|
|
# BASEVERSION looks for `MAJOR.minor.patch` in the string given |
|
# it fills with None if any of them is missing (for example `2.1`) |
|
ver = { |
|
key: 0 if value is None else value for key, value in match.groupdict().items() |
|
} |
|
# Version takes `major`, `minor`, `patch` arguments |
|
ver = Version(**ver) # pyright: ignore[reportArgumentType] |
|
return ver |
|
|
|
|
|
class CodeTimer: |
|
def __init__(self, name=None): |
|
self.name = " '" + name + "'" if name else "" |
|
|
|
def __enter__(self): |
|
self.start = timeit.default_timer() |
|
|
|
def __exit__(self, exc_type, exc_value, traceback): |
|
self.took = (timeit.default_timer() - self.start) * 1000.0 |
|
print("Code block" + self.name + " took: " + str(self.took) + " ms") |
|
|
|
|
|
### YAML representation |
|
def str_presenter(dumper, data): |
|
""" |
|
Configures yaml for dumping multiline strings |
|
Ref: https://stackoverflow.com/a/33300001 |
|
""" |
|
if len(data.splitlines()) > 1: # check for multiline string |
|
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") |
|
return dumper.represent_scalar("tag:yaml.org,2002:str", data) |
|
|
|
|
|
yaml.add_representer(str, str_presenter) |
|
yaml.representer.SafeRepresenter.add_representer(str, str_presenter) |
|
|
|
|
|
# Types |
|
class DependencyDict(TypedDict): |
|
repo: str |
|
branch: str |
|
version: str |
|
precopy: NotRequired[str] |
|
postcopy: NotRequired[str] |
|
|
|
|
|
class DependencyYAML(TypedDict): |
|
dependencies: dict[str, DependencyDict] |
|
|
|
|
|
class UpdateStatusFalse(TypedDict): |
|
has_updates: Literal[False] |
|
|
|
|
|
class UpdateStatusTrue(TypedDict): |
|
has_updates: Literal[True] |
|
version: str |
|
compare_url: str |
|
head_ref: str |
|
head_url: str |
|
|
|
|
|
class CommandRunner: |
|
class Exception(Exception): |
|
def __init__(self, message, returncode, stage, stdout, stderr): |
|
super().__init__(message) |
|
self.returncode = returncode |
|
self.stage = stage |
|
self.stdout = stdout |
|
self.stderr = stderr |
|
|
|
@staticmethod |
|
def run_or_fail(command: list[str], stage: str, *args, **kwargs): |
|
if DRY_RUN and command[0] == "gh": |
|
command.insert(0, "echo") |
|
|
|
result = subprocess.run(command, *args, capture_output=True, **kwargs) |
|
|
|
if result.returncode != 0: |
|
raise CommandRunner.Exception( |
|
f"{stage} command failed with exit code {result.returncode}", |
|
returncode=result.returncode, |
|
stage=stage, |
|
stdout=result.stdout.decode("utf-8"), |
|
stderr=result.stderr.decode("utf-8"), |
|
) |
|
|
|
return result |
|
|
|
|
|
class DependencyStore: |
|
store: DependencyYAML = {"dependencies": {}} |
|
|
|
@staticmethod |
|
def set(data: DependencyYAML): |
|
DependencyStore.store = data |
|
|
|
@staticmethod |
|
def update_dependency_version(path: str, version: str) -> DependencyYAML: |
|
with CodeTimer(f"store deepcopy: {path}"): |
|
store_copy = deepcopy(DependencyStore.store) |
|
|
|
dependency = store_copy["dependencies"].get(path) |
|
if dependency is None: |
|
raise ValueError(f"Dependency {path} {version} not found") |
|
dependency["version"] = version |
|
store_copy["dependencies"][path] = dependency |
|
|
|
return store_copy |
|
|
|
@staticmethod |
|
def write_store(file: str, data: DependencyYAML): |
|
with open(file, "w") as yaml_file: |
|
yaml.safe_dump(data, yaml_file, sort_keys=False) |
|
|
|
|
|
class Dependency: |
|
def __init__(self, path: str, values: DependencyDict): |
|
self.path = path |
|
self.values = values |
|
|
|
self.name: str = "" |
|
self.desc: str = "" |
|
self.kind: str = "" |
|
|
|
match path.split("/"): |
|
case ["plugins", name]: |
|
self.name = name |
|
self.kind = "plugin" |
|
self.desc = f"{name} plugin" |
|
case ["themes", name]: |
|
self.name = name.replace(".zsh-theme", "") |
|
self.kind = "theme" |
|
self.desc = f"{self.name} theme" |
|
case _: |
|
self.name = self.desc = path |
|
|
|
def __str__(self): |
|
output: str = "" |
|
for key in DependencyDict.__dict__["__annotations__"].keys(): |
|
if key not in self.values: |
|
output += f"{key}: None\n" |
|
continue |
|
|
|
value = self.values[key] |
|
if "\n" not in value: |
|
output += f"{key}: {value}\n" |
|
else: |
|
output += f"{key}:\n " |
|
output += value.replace("\n", "\n ", value.count("\n") - 1) |
|
return output |
|
|
|
def update_or_notify(self): |
|
# Print dependency settings |
|
print(f"Processing {self.desc}...", file=sys.stderr) |
|
print(self, file=sys.stderr) |
|
|
|
# Check for updates |
|
repo = self.values["repo"] |
|
remote_branch = self.values["branch"] |
|
version = self.values["version"] |
|
is_tag = version.startswith("tag:") |
|
|
|
try: |
|
with CodeTimer(f"update check: {repo}"): |
|
if is_tag: |
|
status = GitHub.check_newer_tag(repo, version.replace("tag:", "")) |
|
else: |
|
status = GitHub.check_updates(repo, remote_branch, version) |
|
|
|
if status["has_updates"] is True: |
|
short_sha = status["head_ref"][:8] |
|
new_version = status["version"] if is_tag else short_sha |
|
|
|
try: |
|
branch_name = f"update/{self.path}/{new_version}" |
|
|
|
# Create new branch |
|
branch = Git.checkout_or_create_branch(branch_name) |
|
|
|
# Update dependencies.yml file |
|
self.__update_yaml( |
|
f"tag:{new_version}" if is_tag else status["version"] |
|
) |
|
|
|
# Update dependency files |
|
self.__apply_upstream_changes() |
|
|
|
# Add all changes and commit |
|
has_new_commit = Git.add_and_commit(self.name, new_version) |
|
|
|
if has_new_commit: |
|
# Push changes to remote |
|
Git.push(branch) |
|
|
|
# Create GitHub PR |
|
GitHub.create_pr( |
|
branch, |
|
f"feat({self.name}): update to version {new_version}", |
|
f"""## Description |
|
|
|
Update for **{self.desc}**: update to version [{new_version}]({status['head_url']}). |
|
Check out the [list of changes]({status['compare_url']}). |
|
""", |
|
) |
|
|
|
# Clean up repository |
|
Git.clean_repo() |
|
except (CommandRunner.Exception, shutil.Error) as e: |
|
# Handle exception on automatic update |
|
match type(e): |
|
case CommandRunner.Exception: |
|
# Print error message |
|
print( |
|
f"Error running {e.stage} command: {e.returncode}", # pyright: ignore[reportAttributeAccessIssue] |
|
file=sys.stderr, |
|
) |
|
print(e.stderr, file=sys.stderr) # pyright: ignore[reportAttributeAccessIssue] |
|
case shutil.Error: |
|
print(f"Error copying files: {e}", file=sys.stderr) |
|
|
|
try: |
|
Git.clean_repo() |
|
except CommandRunner.Exception as e: |
|
print( |
|
f"Error reverting repository to clean state: {e}", |
|
file=sys.stderr, |
|
) |
|
sys.exit(1) |
|
|
|
# Create a GitHub issue to notify maintainer |
|
title = f"{self.path}: update to {new_version}" |
|
body = f"""## Description |
|
|
|
There is a new version of `{self.name}` {self.kind} available. |
|
|
|
New version: [{new_version}]({status['head_url']}) |
|
Check out the [list of changes]({status['compare_url']}). |
|
""" |
|
|
|
print("Creating GitHub issue", file=sys.stderr) |
|
print(f"{title}\n\n{body}", file=sys.stderr) |
|
GitHub.create_issue(title, body) |
|
except Exception as e: |
|
print(e, file=sys.stderr) |
|
|
|
def __update_yaml(self, new_version: str) -> None: |
|
dep_yaml = DependencyStore.update_dependency_version(self.path, new_version) |
|
DependencyStore.write_store(DEPS_YAML_FILE, dep_yaml) |
|
|
|
def __apply_upstream_changes(self) -> None: |
|
# Patterns to ignore in copying files from upstream repo |
|
GLOBAL_IGNORE = [".git", ".github", ".gitignore"] |
|
|
|
path = os.path.abspath(self.path) |
|
precopy = self.values.get("precopy") |
|
postcopy = self.values.get("postcopy") |
|
|
|
repo = self.values["repo"] |
|
branch = self.values["branch"] |
|
remote_url = f"https://github.com/{repo}.git" |
|
repo_dir = os.path.join(TMP_DIR, repo) |
|
|
|
# Clone repository |
|
Git.clone(remote_url, branch, repo_dir, reclone=True) |
|
|
|
# Run precopy on tmp repo |
|
if precopy is not None: |
|
print("Running precopy script:", end="\n ", file=sys.stderr) |
|
print( |
|
precopy.replace("\n", "\n ", precopy.count("\n") - 1), file=sys.stderr |
|
) |
|
CommandRunner.run_or_fail( |
|
["bash", "-c", precopy], cwd=repo_dir, stage="Precopy" |
|
) |
|
|
|
# Copy files from upstream repo |
|
print(f"Copying files from {repo_dir} to {path}", file=sys.stderr) |
|
shutil.copytree( |
|
repo_dir, |
|
path, |
|
dirs_exist_ok=True, |
|
ignore=shutil.ignore_patterns(*GLOBAL_IGNORE), |
|
) |
|
|
|
# Run postcopy on our repository |
|
if postcopy is not None: |
|
print("Running postcopy script:", end="\n ", file=sys.stderr) |
|
print( |
|
postcopy.replace("\n", "\n ", postcopy.count("\n") - 1), |
|
file=sys.stderr, |
|
) |
|
CommandRunner.run_or_fail( |
|
["bash", "-c", postcopy], cwd=path, stage="Postcopy" |
|
) |
|
|
|
|
|
class Git: |
|
default_branch = "master" |
|
|
|
@staticmethod |
|
def clone(remote_url: str, branch: str, repo_dir: str, reclone=False): |
|
# If repo needs to be fresh |
|
if reclone and os.path.exists(repo_dir): |
|
shutil.rmtree(repo_dir) |
|
|
|
# Clone repo in tmp directory and checkout branch |
|
if not os.path.exists(repo_dir): |
|
print( |
|
f"Cloning {remote_url} to {repo_dir} and checking out {branch}", |
|
file=sys.stderr, |
|
) |
|
CommandRunner.run_or_fail( |
|
["git", "clone", "--depth=1", "-b", branch, remote_url, repo_dir], |
|
stage="Clone", |
|
) |
|
|
|
@staticmethod |
|
def checkout_or_create_branch(branch_name: str): |
|
# Get current branch name |
|
result = CommandRunner.run_or_fail( |
|
["git", "rev-parse", "--abbrev-ref", "HEAD"], stage="GetDefaultBranch" |
|
) |
|
Git.default_branch = result.stdout.decode("utf-8").strip() |
|
|
|
# Create new branch and return created branch name |
|
try: |
|
# try to checkout already existing branch |
|
CommandRunner.run_or_fail( |
|
["git", "checkout", branch_name], stage="CreateBranch" |
|
) |
|
except CommandRunner.Exception: |
|
# otherwise create new branch |
|
CommandRunner.run_or_fail( |
|
["git", "checkout", "-b", branch_name], stage="CreateBranch" |
|
) |
|
return branch_name |
|
|
|
@staticmethod |
|
def add_and_commit(scope: str, version: str) -> bool: |
|
""" |
|
Returns `True` if there were changes and were indeed commited. |
|
Returns `False` if the repo was clean and no changes were commited. |
|
""" |
|
# check if repo is clean (clean => no error, no commit) |
|
try: |
|
CommandRunner.run_or_fail( |
|
["git", "diff", "--exit-code"], stage="CheckRepoClean" |
|
) |
|
return False |
|
except CommandRunner.Exception: |
|
# if it's other kind of error just throw! |
|
pass |
|
|
|
user_name = os.environ.get("GIT_APP_NAME") |
|
user_email = os.environ.get("GIT_APP_EMAIL") |
|
|
|
# Add all files to git staging |
|
CommandRunner.run_or_fail(["git", "add", "-A", "-v"], stage="AddFiles") |
|
|
|
# Reset environment and git config |
|
clean_env = os.environ.copy() |
|
clean_env["LANG"] = "C.UTF-8" |
|
clean_env["GIT_CONFIG_GLOBAL"] = "/dev/null" |
|
clean_env["GIT_CONFIG_NOSYSTEM"] = "1" |
|
|
|
# Commit with settings above |
|
CommandRunner.run_or_fail( |
|
[ |
|
"git", |
|
"-c", |
|
f"user.name={user_name}", |
|
"-c", |
|
f"user.email={user_email}", |
|
"commit", |
|
"-m", |
|
f"feat({scope}): update to {version}", |
|
], |
|
stage="CreateCommit", |
|
env=clean_env, |
|
) |
|
return True |
|
|
|
@staticmethod |
|
def push(branch: str): |
|
CommandRunner.run_or_fail( |
|
["git", "push", "-u", "origin", branch], stage="PushBranch" |
|
) |
|
|
|
@staticmethod |
|
def clean_repo(): |
|
CommandRunner.run_or_fail( |
|
["git", "reset", "--hard", "HEAD"], stage="ResetRepository" |
|
) |
|
CommandRunner.run_or_fail( |
|
["git", "checkout", Git.default_branch], stage="CheckoutDefaultBranch" |
|
) |
|
|
|
|
|
class GitHub: |
|
@staticmethod |
|
def check_newer_tag(repo, current_tag) -> UpdateStatusFalse | UpdateStatusTrue: |
|
# GET /repos/:owner/:repo/git/refs/tags |
|
url = f"https://api.github.com/repos/{repo}/git/refs/tags" |
|
|
|
# Send a GET request to the GitHub API |
|
response = requests.get(url) |
|
current_version = coerce(current_tag) |
|
if current_version is None: |
|
raise ValueError( |
|
f"Stored {current_version} from {repo} does not follow semver" |
|
) |
|
|
|
# If the request was successful |
|
if response.status_code == 200: |
|
# Parse the JSON response |
|
data = response.json() |
|
|
|
if len(data) == 0: |
|
return { |
|
"has_updates": False, |
|
} |
|
|
|
latest_ref = None |
|
latest_version: Optional[Version] = None |
|
for ref in data: |
|
# we find the tag since GitHub returns it as plain git ref |
|
tag_version = coerce(ref["ref"].replace("refs/tags/", "")) |
|
if tag_version is None: |
|
# we skip every tag that is not semver-complaint |
|
continue |
|
if latest_version is None or tag_version.compare(latest_version) > 0: |
|
# if we have a "greater" semver version, set it as latest |
|
latest_version = tag_version |
|
latest_ref = ref |
|
|
|
# raise if no valid semver tag is found |
|
if latest_ref is None or latest_version is None: |
|
raise ValueError(f"No tags following semver found in {repo}") |
|
|
|
# we get the tag since GitHub returns it as plain git ref |
|
latest_tag = latest_ref["ref"].replace("refs/tags/", "") |
|
|
|
if latest_version.compare(current_version) <= 0: |
|
return { |
|
"has_updates": False, |
|
} |
|
|
|
return { |
|
"has_updates": True, |
|
"version": latest_tag, |
|
"compare_url": f"https://github.com/{repo}/compare/{current_tag}...{latest_tag}", |
|
"head_ref": latest_ref["object"]["sha"], |
|
"head_url": f"https://github.com/{repo}/releases/tag/{latest_tag}", |
|
} |
|
else: |
|
# If the request was not successful, raise an exception |
|
raise Exception( |
|
f"GitHub API request failed with status code {response.status_code}: {response.json()}" |
|
) |
|
|
|
@staticmethod |
|
def check_updates(repo, branch, version) -> UpdateStatusFalse | UpdateStatusTrue: |
|
url = f"https://api.github.com/repos/{repo}/compare/{version}...{branch}" |
|
|
|
# Send a GET request to the GitHub API |
|
response = requests.get(url) |
|
|
|
# If the request was successful |
|
if response.status_code == 200: |
|
# Parse the JSON response |
|
data = response.json() |
|
|
|
# If the base is behind the head, there is a newer version |
|
has_updates = data["status"] != "identical" |
|
|
|
if not has_updates: |
|
return { |
|
"has_updates": False, |
|
} |
|
|
|
return { |
|
"has_updates": data["status"] != "identical", |
|
"version": data["commits"][-1]["sha"], |
|
"compare_url": data["permalink_url"], |
|
"head_ref": data["commits"][-1]["sha"], |
|
"head_url": data["commits"][-1]["html_url"], |
|
} |
|
else: |
|
# If the request was not successful, raise an exception |
|
raise Exception( |
|
f"GitHub API request failed with status code {response.status_code}: {response.json()}" |
|
) |
|
|
|
@staticmethod |
|
def create_issue(title: str, body: str) -> None: |
|
cmd = ["gh", "issue", "create", "-t", title, "-b", body] |
|
CommandRunner.run_or_fail(cmd, stage="CreateIssue") |
|
|
|
@staticmethod |
|
def create_pr(branch: str, title: str, body: str) -> None: |
|
# first of all let's check if PR is already open |
|
check_cmd = [ |
|
"gh", |
|
"pr", |
|
"list", |
|
"--state", |
|
"open", |
|
"--head", |
|
branch, |
|
"--json", |
|
"title", |
|
] |
|
# returncode is 0 also if no PRs are found |
|
output = json.loads( |
|
CommandRunner.run_or_fail(check_cmd, stage="CheckPullRequestOpen") |
|
.stdout.decode("utf-8") |
|
.strip() |
|
) |
|
# we have PR in this case! |
|
if len(output) > 0: |
|
return |
|
cmd = [ |
|
"gh", |
|
"pr", |
|
"create", |
|
"-B", |
|
Git.default_branch, |
|
"-H", |
|
branch, |
|
"-t", |
|
title, |
|
"-b", |
|
body, |
|
] |
|
CommandRunner.run_or_fail(cmd, stage="CreatePullRequest") |
|
|
|
|
|
def main(): |
|
# Load the YAML file |
|
with open(DEPS_YAML_FILE, "r") as yaml_file: |
|
data: DependencyYAML = yaml.safe_load(yaml_file) |
|
|
|
if "dependencies" not in data: |
|
raise Exception("dependencies.yml not properly formatted") |
|
|
|
# Cache YAML version |
|
DependencyStore.set(data) |
|
|
|
dependencies = data["dependencies"] |
|
for path in dependencies: |
|
dependency = Dependency(path, dependencies[path]) |
|
dependency.update_or_notify() |
|
|
|
|
|
if __name__ == "__main__": |
|
main()
|
|
|