1from collections.abc import Generator
2from dataclasses import dataclass
3import json
4import logging
5from pathlib import Path
6from typing import Any, TypedDict
7
8from tree_sitter import Node as TreeSitterNode
9
10from sphinx_codelinks.analyse import utils
11from sphinx_codelinks.analyse.models import (
12 MarkedContentType,
13 MarkedRst,
14 NeedIdRefs,
15 OneLineNeed,
16 SourceComment,
17 SourceFile,
18 SourceMap,
19)
20from sphinx_codelinks.analyse.oneline_parser import (
21 OnelineParserInvalidWarning,
22 oneline_parser,
23)
24from sphinx_codelinks.config import (
25 UNIX_NEWLINE,
26 OneLineCommentStyle,
27 SourceAnalyseConfig,
28)
29
30# initialize logger
31logger = logging.getLogger(__name__)
32logger.setLevel(logging.INFO)
33# log to the console
34console = logging.StreamHandler()
35console.setLevel(logging.INFO)
36logger.addHandler(console)
37
38
39class AnalyseWarningType(TypedDict):
40 file_path: str
41 lineno: int
42 msg: str
43 type: str
44 sub_type: str
45
46
47@dataclass
48class AnalyseWarning:
49 file_path: str
50 lineno: int
51 msg: str
52 type: str
53 sub_type: str
54
55
56class SourceAnalyse:
57 def __init__(
58 self,
59 analyse_config: SourceAnalyseConfig,
60 ) -> None:
61 self.analyse_config = analyse_config
62 self.src_files: list[SourceFile] = []
63 self.src_comments: list[SourceComment] = []
64 self.need_id_refs: list[NeedIdRefs] = []
65 self.oneline_needs: list[OneLineNeed] = []
66 self.marked_rst: list[MarkedRst] = []
67 self.all_marked_content: list[NeedIdRefs | OneLineNeed | MarkedRst] = []
68 # Use explicitly configured git_root if provided, otherwise auto-detect
69 if self.analyse_config.git_root is not None:
70 self.git_root: Path | None = self.analyse_config.git_root.resolve()
71 else:
72 self.git_root = utils.locate_git_root(self.analyse_config.src_dir)
73 self.git_remote_url: str | None = (
74 utils.get_remote_url(self.git_root) if self.git_root else None
75 )
76 self.git_commit_rev: str | None = (
77 utils.get_current_rev(self.git_root) if self.git_root else None
78 )
79 self.project_path: Path = (
80 self.git_root if self.git_root else self.analyse_config.src_dir
81 )
82 self.oneline_warnings: list[AnalyseWarning] = []
83
84 def get_src_strings(self) -> Generator[tuple[Path, bytes], Any, None]: # type: ignore[explicit-any]
85 """Load source files and extract their content."""
86 for src_path in self.analyse_config.src_files:
87 if not utils.is_text_file(src_path):
88 continue
89 with src_path.open("r", encoding="utf-8", newline="") as f:
90 # Normalize all line endings to Unix LF
91 text = f.read()
92 text = text.replace("\r\n", "\n").replace("\r", "\n")
93 yield src_path, text.encode("utf-8")
94
95 def create_src_objects(self) -> None:
96 parser, query = utils.init_tree_sitter(self.analyse_config.comment_type)
97
98 for src_path, src_string in self.get_src_strings():
99 comments: list[TreeSitterNode] | None = utils.extract_comments(
100 src_string, parser, query
101 )
102 if not comments:
103 continue
104 src_comments: list[SourceComment] = [
105 SourceComment(node) for node in comments
106 ]
107
108 src_file = SourceFile(src_path.absolute())
109 src_file.add_comments(src_comments)
110 self.src_files.append(src_file)
111 self.src_comments.extend(src_comments)
112
113 logger.info(f"Source files loaded: {len(self.src_files)}")
114 logger.info(f"Source comments extracted: {len(self.src_comments)}")
115
116 def extract_marker(
117 self,
118 text: str,
119 ) -> Generator[tuple[str, list[str], int, int, int], None, None]:
120 lines = text.splitlines()
121 row_offset = 0
122 for line in lines:
123 for marker in self.analyse_config.need_id_refs_config.markers:
124 marker_idx = line.find(marker)
125 if marker_idx == -1:
126 continue
127 markered_text = line[marker_idx + len(marker) :].strip()
128 need_ids = markered_text.replace(",", " ").split()
129 start_column = marker_idx + len(marker)
130 end_column = start_column + len(markered_text)
131 yield marker, need_ids, row_offset, start_column, end_column
132 row_offset += 1
133
[docs]134 # @Extract need ID references from code comments, IMPL_LNK_1, impl, [FE_LNK]
135 def extract_anchors(
136 self,
137 text: str,
138 filepath: Path,
139 tagged_scope: TreeSitterNode | None,
140 src_comment: SourceComment,
141 ) -> list[NeedIdRefs]:
142 """Extract need-ids-refs from a comment."""
143 anchors: list[NeedIdRefs] = []
144 for (
145 marker,
146 need_ids,
147 row_offset,
148 start_column,
149 end_column,
150 ) in self.extract_marker(text):
151 lineno = src_comment.node.start_point.row + row_offset + 1
152 remote_url = self.git_remote_url
153 if self.git_remote_url and self.git_commit_rev:
154 remote_url = utils.form_https_url(
155 self.git_remote_url,
156 self.git_commit_rev,
157 self.project_path,
158 filepath,
159 lineno,
160 )
161 source_map: SourceMap = {
162 "start": {
163 "row": lineno - 1,
164 "column": start_column,
165 },
166 "end": {
167 "row": lineno - 1,
168 "column": end_column,
169 },
170 }
171 anchors.append(
172 NeedIdRefs(
173 filepath,
174 remote_url,
175 source_map,
176 src_comment,
177 tagged_scope,
178 need_ids,
179 marker,
180 )
181 )
182 return anchors
183
184 def extract_oneline_need(
185 self,
186 text: str,
187 src_comment: SourceComment,
188 oneline_comment_style: OneLineCommentStyle,
189 ) -> Generator[tuple[dict[str, str | list[str] | int], int]]:
190 lines = text.splitlines(keepends=True)
191 row_offset = 0
192 if len(lines) == 1:
193 # single line comment has no newline char in the extracted comment
194 lines[0] = f"{lines[0]}{UNIX_NEWLINE}"
195
196 for line in lines:
197 resolved = oneline_parser(line, oneline_comment_style)
198 if not resolved:
199 row_offset += 1
200 continue
201 if isinstance(resolved, OnelineParserInvalidWarning):
202 if not src_comment.source_file:
203 row_offset += 1
204 continue
205 lineno = src_comment.node.start_point.row + row_offset + 1
206 warning = AnalyseWarning(
207 str(src_comment.source_file.filepath),
208 lineno,
209 resolved.msg,
210 MarkedContentType.need,
211 resolved.sub_type.value,
212 )
213 self.oneline_warnings.append(warning)
214 row_offset += 1
215 continue
216 yield resolved, row_offset
217 row_offset += 1
218
[docs]219 # @Extract one-line traceability needs from comments, IMPL_ONE_1, impl, [FE_DEF, FE_CMT]
220 def extract_oneline_needs(
221 self,
222 text: str,
223 filepath: Path,
224 tagged_scope: TreeSitterNode | None,
225 src_comment: SourceComment,
226 oneline_comment_style: OneLineCommentStyle,
227 ) -> list[OneLineNeed]:
228 row_offset = 0
229 oneline_needs = []
230 for resolved, row_offset in self.extract_oneline_need(
231 text, src_comment, oneline_comment_style
232 ):
233 lineno = src_comment.node.start_point.row + row_offset + 1
234 remote_url = self.git_remote_url
235 if self.git_remote_url and self.git_commit_rev:
236 remote_url = utils.form_https_url(
237 self.git_remote_url,
238 self.git_commit_rev,
239 self.project_path,
240 filepath,
241 lineno,
242 )
243 source_map: SourceMap = {
244 "start": {
245 "row": lineno - 1,
246 "column": resolved["start_column"], # type: ignore[typeddict-item] # dynamic keys
247 },
248 "end": {
249 "row": lineno - 1,
250 "column": resolved["end_column"], # type: ignore[typeddict-item] # dynamic keys
251 },
252 }
253 del resolved["start_column"]
254 del resolved["end_column"]
255 oneline_needs.append(
256 OneLineNeed(
257 filepath,
258 remote_url,
259 source_map,
260 src_comment,
261 tagged_scope,
262 resolved, # type: ignore[arg-type] # int arguments were deleted
263 )
264 )
265 return oneline_needs
266
[docs]267 # @Extract marked reStructuredText blocks from comments, IMPL_MRST_1, impl, [FE_RST_EXTRACTION]
268 def extract_marked_rst(
269 self,
270 text: str,
271 filepath: Path,
272 tagged_scope: TreeSitterNode | None,
273 src_comment: SourceComment,
274 ) -> MarkedRst | None:
275 """Extract marked rst from a comment.
276
277 Presumably, only one marked rst text in a comment.
278 """
279 extracted_rst = utils.extract_rst(
280 text,
281 self.analyse_config.marked_rst_config.start_sequence,
282 self.analyse_config.marked_rst_config.end_sequence,
283 )
284 if not extracted_rst:
285 return None
286 if UNIX_NEWLINE in extracted_rst["rst_text"]:
287 rst_text = utils.remove_leading_sequences(extracted_rst["rst_text"], ["*"])
288 else:
289 rst_text = extracted_rst["rst_text"]
290 lineno = src_comment.node.start_point.row + extracted_rst["row_offset"] + 1
291 remote_url = self.git_remote_url
292 if self.git_remote_url and self.git_commit_rev:
293 remote_url = utils.form_https_url(
294 self.git_remote_url,
295 self.git_commit_rev,
296 self.project_path,
297 filepath,
298 lineno,
299 )
300 source_map: SourceMap = {
301 "start": {
302 "row": lineno - 1,
303 "column": extracted_rst["start_idx"],
304 },
305 "end": {
306 "row": lineno - 1,
307 "column": extracted_rst["end_idx"],
308 },
309 }
310 return MarkedRst(
311 filepath,
312 remote_url,
313 source_map,
314 src_comment,
315 tagged_scope,
316 rst_text,
317 )
318
319 def extract_marked_content(self) -> None:
320 for src_comment in self.src_comments:
321 text = (
322 src_comment.node.text.decode("utf-8") if src_comment.node.text else None
323 )
324 if not text:
325 continue
326 filepath = (
327 src_comment.source_file.filepath if src_comment.source_file else None
328 )
329 if not filepath:
330 continue
331 tagged_scope: TreeSitterNode | None = utils.find_associated_scope(
332 src_comment.node, self.analyse_config.comment_type
333 )
334 if self.analyse_config.get_need_id_refs:
335 anchors = self.extract_anchors(
336 text, filepath, tagged_scope, src_comment
337 )
338 self.need_id_refs.extend(anchors)
339
340 if self.analyse_config.get_oneline_needs:
341 oneline_needs = self.extract_oneline_needs(
342 text,
343 filepath,
344 tagged_scope,
345 src_comment,
346 self.analyse_config.oneline_comment_style,
347 )
348 self.oneline_needs.extend(oneline_needs)
349 if self.analyse_config.get_rst:
350 marked_rst = self.extract_marked_rst(
351 text, filepath, tagged_scope, src_comment
352 )
353 if marked_rst:
354 self.marked_rst.append(marked_rst)
355
356 if self.analyse_config.get_need_id_refs:
357 logger.info(f"Need-id-refs extracted: {len(self.need_id_refs)}")
358 if self.analyse_config.get_oneline_needs:
359 logger.info(f"Oneline needs extracted: {len(self.oneline_needs)}")
360 if self.analyse_config.get_rst:
361 logger.info(f"Marked rst extracted: {len(self.marked_rst)}")
362
363 def merge_marked_content(self) -> None:
364 self.all_marked_content.extend(self.need_id_refs)
365 self.oneline_needs.sort(key=lambda x: x.source_map["start"]["row"])
366 self.all_marked_content.extend(self.oneline_needs)
367 self.all_marked_content.extend(self.marked_rst)
368 self.all_marked_content.sort(
369 key=lambda x: (x.filepath, x.source_map["start"]["row"])
370 )
371
372 def dump_marked_content(self, outdir: Path) -> None:
373 output_path = outdir / "marked_content.json"
374 if not output_path.parent.exists():
375 output_path.parent.mkdir(parents=True)
376 to_dump = [
377 marked_content.to_dict() for marked_content in self.all_marked_content
378 ]
379 with output_path.open("w") as f:
380 json.dump(to_dump, f)
381 logger.info(f"Marked content dumped to {output_path}")
382
383 def run(self) -> None:
384 self.create_src_objects()
385 self.extract_marked_content()
386 self.merge_marked_content()