## # changemarks.py # # Script to generate a markdown file per clause modified in a merge request # # (c) 2023 by Miguel Angel Reina Ortega # License: BSD 3-Clause License. See the LICENSE file for further details. # import argparse, os, re, sys from rich import print from rich.progress import Progress, TextColumn, TimeElapsedColumn import logging import requests from unidiff import PatchSet def fetch(url : str, expected_content_type : str = None) -> requests.Response: r = requests.get(url) logging.debug(f"Fetching {url}") if (r.status_code != 200): errorMessage = f"Failed attempting to retrieve {url}, status code {r.status_code}" logging.error(errorMessage) raise ValueError(errorMessage) if expected_content_type: if r.headers['Content-Type'] != expected_content_type: errorMessage = f"Unexpected content type retrieving {url}. Expected {expected_content_type}, got {r.headers['Content-Type']}" logging.error(errorMessage) raise ValueError(errorMessage) return r def fetch_text(url : str, expected_content_type : str = None) -> str: r = fetch(url, expected_content_type) return r.text def fetch_json(url : str, expected_content_type : str = None) -> requests.Response: r = fetch(url, expected_content_type) return r.json() def readMDFile(progress:Progress, document:str) -> list[str]: """ Read the markdown file and return a list of lines. """ _taskID = progress.add_task('[blue]Reading document', start=False, total=0) # Check if file exists if not os.path.exists(document): print(f'File {document} does not exist') exit(1) # Read the file with open(document, 'r', encoding='utf-8', errors = 'replace') as f: progress.stop_task(_taskID) return f.readlines() def writeMDFile(progress:Progress, mdLines:list[str], document:str, outDirectory:str) -> None: """ Write the markdown file. """ _taskID = progress.add_task('[blue]Writing document', start=False, total=0) # Write the file with open(f'{outDirectory}/{os.path.basename(document)}', 'w', encoding='utf-8', errors = 'replace') as f: f.writelines(mdLines) progress.stop_task(_taskID) class Clause(): ''' Defines a clause of the base document ''' def __init__(self, line, from_id, to_id, clause_nr): self.raw = line self.from_id = from_id self.to_id = to_id self.clause_nr = clause_nr def find_all_clauses(progress:Progress, mdLines:list[str]): ''' Scans the body of the document to find all clauses Returns a list of Clauses, start index and end index ''' _taskID = progress.add_task('[blue]Find all available clauses', start=False, total=0) clauseregex = re.compile('^#+\s(\d(\.\d)*|Annex \w|\w*(\.\d)*).*') clauses:list[Clause] = [] index = 1 empty = "" clause = Clause(empty,0,0,empty) for line in mdLines: if line.startswith('#'): matches = re.findall(clauseregex, line) # Match heading if matches: # It may be the end of the clause or the start of a subclause if index - 2 == clause.from_id: # It is a subclause clause.from_id = index clause.raw = line clause.clause_nr = matches[0][0] else: # It is the end of the clause clause.to_id = index - 1 clauses.append(clause) clause = Clause(line,index,index,matches[0][0]) else: # it is last clause print("Unknown heading") index = index + 1 # Append last clause (usually History) clause.to_id = index - 1 clauses.append(clause) logging.debug("Number of clauses: {len(clauses)}") return clauses class MR: def __init__(self, project_id, mr_id, root = "https://git.onem2m.org"): self.project_id = project_id self.mr_id = mr_id self.root = root self.raw_project_details = fetch_json(self.api_url()) self.web_url = self.raw_project_details['web_url'] self.raw_mr_details = fetch_json(self.api_url(f'/merge_requests/{self.mr_id}')) self.author = self.raw_mr_details['author']['name'] self.date = self.raw_mr_details['updated_at'] self.target_branch = self.raw_mr_details['target_branch'] self.source_branch = self.raw_mr_details['source_branch'] self.title = self.raw_mr_details['title'] self.description = self.raw_mr_details['description'] self.raw_diff = fetch_text(f'{self.web_url}/-/merge_requests/{self.mr_id}.diff', expected_content_type='text/plain') self.patch_set = PatchSet.from_string(self.raw_diff) def api_url(self, route : str = "") -> str: return f"{self.root}/api/v4/projects/{self.project_id}/{route}" def retrieve_text(self, branch: str, filename: str) -> str: return fetch_text(f"{self.web_url}/-/raw/{branch}/{filename}") def find_changed_clauses(progress:Progress, mdLines:list[str], clauses:list[Clause], mr:MR, outDirectory:str ): ''' Determine the clauses that have been modified by the merge request https://forge.etsi.org/rep/cdm/pipeline-scripts/-/blob/main/common/Dockerfile.stfubuntu Returns a list of Clauses, start index and end index ''' _taskID = progress.add_task('[blue]Find changed clauses', start=False, total=0) changed_clauses:list[Clause] = [] empty = "" changed_clause = Clause(empty,0,0,empty) for patched_file in mr.patch_set: if patched_file.source_file.startswith("a/TS"): logging.debug(f"Looking at changes in {patched_file.source_file}") lines_added = 0 lines_removed = 0 for change in patched_file: change_start_line, change_end_line, change_lines_added, change_lines_removed = changeDetails(change) lines_added = lines_added + change_lines_added lines_removed = lines_removed + change_lines_removed # Check the previous changed_clause if (changed_clause.from_id <= change_start_line - lines_added + lines_removed) and (changed_clause.to_id + lines_added - lines_removed >= change_end_line): generateMDforChange(progress, mdLines, changed_clause, change, outDirectory, True) break i = 0 # Check all clauses for clause in clauses: if (clause.from_id <= change_start_line - lines_added + lines_removed) and (clause.to_id + lines_added - lines_removed >= change_end_line): changed_clause = clauses.pop(i) changed_clauses.append(clause) generateMDforChange(progress, mdLines, changed_clause, change, outDirectory, False) break i = i + 1 for clause in changed_clauses: logging.debug(f"Clause {clause.clause_nr} contains modifications") return changed_clauses def changeDetails(change) -> (int, int, int, int): i = 0 lines_added = 0 lines_removed = 0 change_start_line = change.target_start change_end_line = change_start_line for line in change: if line.is_added or line.is_removed: if change_start_line == change.target_start: change_start_line = change.target_start + i change_end_line = change_end_line + i else: change_end_line = change_end_line + i i = 0 if line.is_added: lines_added = lines_added + 1 elif line.is_removed: lines_removed = lines_removed + 1 i = i + 1 change_end_line = change_end_line - lines_removed return change_start_line, change_end_line, lines_added, lines_removed def generateMDforChange(progress:Progress, mdLines:list[str],changed_clause:Clause, change, outDirectory:str, existing_clause:bool): ''' Generate the MD for the clauses that have been modified by the merge request https://forge.etsi.org/rep/cdm/pipeline-scripts/-/blob/main/common/Dockerfile.stfubuntu Returns a list of Clauses, start index and end index ''' _taskID = progress.add_task('[blue]Generate MD for changed clauses', start=False, total=0) if not existing_clause: index = changed_clause.from_id - 1 clauseMDlines: list[str] = [] while index < changed_clause.to_id: clauseMDlines.append(mdLines[index]+'\n') index = index + 1 else: clauseMDlines = readMDFile(progress, changed_clause.clause_nr +'.md') j = change.target_start - changed_clause.from_id # index gap for line in change: if (not (line.value.strip() == '') and (line.is_added)): if line.value.strip().startswith("|"): # It is a table tableElements = line.value.strip().split("|") modifiedElements:list[str] = [] for element in tableElements: if not element.strip() == '': modifiedElements.append("<span class=\"underline\">" + element.strip() + "</span>") #modifiedRow = "|" + "|".join(modifiedElements) + "|" + "\n" else: modifiedElements.append(element) modifiedRow = "|".join(modifiedElements) + "\n" clauseMDlines.insert(j,modifiedRow) clauseMDlines.pop(j + 1) else: clauseMDlines.insert(j, "<span class=\"underline\">" + line.value + "</span>\n\n") #clauseMDlines.insert(j, "<mark>" + line.value.strip("\n") + "</mark>\n\n") clauseMDlines.pop(j+1) elif (not (line.value.strip() == '') and (line.is_removed)): if line.value.strip().startswith("|"): # It is a table tableElements = line.value.strip().split("|") modifiedElements: list[str] = [] for element in tableElements: if not element.strip() == '': modifiedElements.append("~~" + element.strip() + "~~") #modifiedRow = "|" + "|".join(modifiedElements) + "|" + "\n" else: modifiedElements.append(element) modifiedRow = "|".join(modifiedElements) + "\n" clauseMDlines.insert(j, modifiedRow) else: clauseMDlines.insert(j, "~~" + line.value.strip() + "~~") j = j + 1 clauseMDlines.insert(j, "\n\n<br />") writeMDFile(progress, clauseMDlines, changed_clause.clause_nr.replace(" ","") + '.md', outDirectory) def process(document:str, outDirectory:str, mr:MR) -> None: with Progress(TextColumn('{task.description}'), TimeElapsedColumn()) as progress: sourceText = mr.retrieve_text(mr.source_branch, document) sourceMdLines = sourceText.splitlines(keepends=False) clauses = find_all_clauses(progress, sourceMdLines) changed_clauses = find_changed_clauses(progress, sourceMdLines, clauses, mr, outDirectory) def main(args=None): # Parse command line arguments parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--outdir', '-o', action='store', dest='outDirectory', default = 'out', metavar = '<output directory>', help = 'specify output directory') parser.add_argument('rootURL', help="Forge root URL") parser.add_argument('projectID', help="Forge project ID") parser.add_argument('mergeID', help="Merge IID") pargs = parser.parse_args() # Process documents and print output os.makedirs(pargs.outDirectory, exist_ok = True) mr = MR(pargs.projectID, pargs.mergeID, pargs.rootURL) for patched_file in mr.patch_set: if patched_file.source_file.startswith("a/TS"): filename = patched_file.source_file.split("/")[1] process(filename, pargs.outDirectory, mr) else: logging.debug(f"Cannot process file named {patched_file.source_file}") if __name__ == '__main__': sys.exit(main())