1from collections.abc import Generator
2from dataclasses import dataclass
3import json
4from pathlib import Path
5from typing import Any, TypedDict
6
7from tree_sitter import Node as TreeSitterNode
8
9from sphinx_codelinks.analyse import utils
10from sphinx_codelinks.analyse.models import (
11 MarkedContentType,
12 MarkedRst,
13 NeedIdRefs,
14 OneLineNeed,
15 SourceComment,
16 SourceFile,
17 SourceMap,
18)
19from sphinx_codelinks.analyse.oneline_parser import (
20 OnelineParserInvalidWarning,
21 oneline_parser,
22)
23from sphinx_codelinks.config import (
24 UNIX_NEWLINE,
25 OneLineCommentStyle,
26 SourceAnalyseConfig,
27)
28from sphinx_codelinks.logger import get_logger
29
30logger = get_logger(__name__)
31
32
33def _count(n: int, noun: str) -> str:
34 """Format ``n noun`` with a naive (append-s) plural for progress summaries."""
35 return f"{n} {noun}" if n == 1 else f"{n} {noun}s"
36
37
38class AnalyseWarningType(TypedDict):
39 file_path: str
40 lineno: int
41 msg: str
42 type: str
43 sub_type: str
44
45
46@dataclass
47class AnalyseWarning:
48 file_path: str
49 lineno: int
50 msg: str
51 type: str
52 sub_type: str
53
54
55class SourceAnalyse:
56 def __init__(
57 self,
58 analyse_config: SourceAnalyseConfig,
59 *,
60 name: str = "",
61 ) -> None:
62 self.name = name
63 self.analyse_config = analyse_config
64 self.src_files: list[SourceFile] = []
65 self.src_comments: list[SourceComment] = []
66 self.need_id_refs: list[NeedIdRefs] = []
67 self.oneline_needs: list[OneLineNeed] = []
68 self.marked_rst: list[MarkedRst] = []
69 self.all_marked_content: list[NeedIdRefs | OneLineNeed | MarkedRst] = []
70 # Use explicitly configured git_root if provided, otherwise auto-detect
71 if self.analyse_config.git_root is not None:
72 self.git_root: Path | None = self.analyse_config.git_root.resolve()
73 else:
74 self.git_root = utils.locate_git_root(self.analyse_config.src_dir)
75 self.git_remote_url: str | None = (
76 utils.get_remote_url(self.git_root) if self.git_root else None
77 )
78 self.git_commit_rev: str | None = (
79 utils.get_current_rev(self.git_root) if self.git_root else None
80 )
81 self.project_path: Path = (
82 self.git_root if self.git_root else self.analyse_config.src_dir
83 )
84 self.oneline_warnings: list[AnalyseWarning] = []
85
86 def get_src_strings(self) -> Generator[tuple[Path, bytes], Any, None]: # type: ignore[explicit-any]
87 """Load source files and extract their content."""
88 for src_path in self.analyse_config.src_files:
89 if not utils.is_text_file(src_path):
90 continue
91 with src_path.open("r", encoding="utf-8", newline="") as f:
92 # Normalize all line endings to Unix LF
93 text = f.read()
94 text = text.replace("\r\n", "\n").replace("\r", "\n")
95 yield src_path, text.encode("utf-8")
96
97 def create_src_objects(self) -> None:
98 parser, query = utils.init_tree_sitter(self.analyse_config.comment_type)
99
100 for src_path, src_string in self.get_src_strings():
101 comments: list[TreeSitterNode] | None = utils.extract_comments(
102 src_string, parser, query
103 )
104 if not comments:
105 continue
106 src_comments: list[SourceComment] = [
107 SourceComment(node) for node in comments
108 ]
109
110 src_file = SourceFile(src_path.absolute())
111 src_file.add_comments(src_comments)
112 self.src_files.append(src_file)
113 self.src_comments.extend(src_comments)
114
115 def extract_marker(
116 self,
117 text: str,
118 ) -> Generator[tuple[str, list[str], int, int, int], None, None]:
119 lines = text.splitlines()
120 row_offset = 0
121 for line in lines:
122 for marker in self.analyse_config.need_id_refs_config.markers:
123 marker_idx = line.find(marker)
124 if marker_idx == -1:
125 continue
126 markered_text = line[marker_idx + len(marker) :].strip()
127 need_ids = markered_text.replace(",", " ").split()
128 start_column = marker_idx + len(marker)
129 end_column = start_column + len(markered_text)
130 yield marker, need_ids, row_offset, start_column, end_column
131 row_offset += 1
132
[docs]133 # @Extract need ID references from code comments, IMPL_LNK_1, impl, [FE_LNK]
134 def extract_anchors(
135 self,
136 text: str,
137 filepath: Path,
138 tagged_scope: TreeSitterNode | None,
139 src_comment: SourceComment,
140 ) -> list[NeedIdRefs]:
141 """Extract need-ids-refs from a comment."""
142 anchors: list[NeedIdRefs] = []
143 for (
144 marker,
145 need_ids,
146 row_offset,
147 start_column,
148 end_column,
149 ) in self.extract_marker(text):
150 lineno = src_comment.node.start_point.row + row_offset + 1
151 remote_url = self.git_remote_url
152 if self.git_remote_url and self.git_commit_rev:
153 remote_url = utils.form_https_url(
154 self.git_remote_url,
155 self.git_commit_rev,
156 self.project_path,
157 filepath,
158 lineno,
159 )
160 source_map: SourceMap = {
161 "start": {
162 "row": lineno - 1,
163 "column": start_column,
164 },
165 "end": {
166 "row": lineno - 1,
167 "column": end_column,
168 },
169 }
170 anchors.append(
171 NeedIdRefs(
172 filepath,
173 remote_url,
174 source_map,
175 src_comment,
176 tagged_scope,
177 need_ids,
178 marker,
179 )
180 )
181 return anchors
182
183 def extract_oneline_need(
184 self,
185 text: str,
186 src_comment: SourceComment,
187 oneline_comment_style: OneLineCommentStyle,
188 ) -> Generator[tuple[dict[str, str | list[str] | int], int]]:
189 lines = text.splitlines(keepends=True)
190 row_offset = 0
191 if len(lines) == 1:
192 # single line comment has no newline char in the extracted comment
193 lines[0] = f"{lines[0]}{UNIX_NEWLINE}"
194
195 for line in lines:
196 resolved = oneline_parser(line, oneline_comment_style)
197 if not resolved:
198 row_offset += 1
199 continue
200 if isinstance(resolved, OnelineParserInvalidWarning):
201 if not src_comment.source_file:
202 row_offset += 1
203 continue
204 lineno = src_comment.node.start_point.row + row_offset + 1
205 warning = AnalyseWarning(
206 str(src_comment.source_file.filepath),
207 lineno,
208 resolved.msg,
209 MarkedContentType.need,
210 resolved.sub_type.value,
211 )
212 self.oneline_warnings.append(warning)
213 row_offset += 1
214 continue
215 yield resolved, row_offset
216 row_offset += 1
217
[docs]218 # @Extract one-line traceability needs from comments, IMPL_ONE_1, impl, [FE_DEF, FE_CMT]
219 def extract_oneline_needs(
220 self,
221 text: str,
222 filepath: Path,
223 tagged_scope: TreeSitterNode | None,
224 src_comment: SourceComment,
225 oneline_comment_style: OneLineCommentStyle,
226 ) -> list[OneLineNeed]:
227 row_offset = 0
228 oneline_needs = []
229 for resolved, row_offset in self.extract_oneline_need(
230 text, src_comment, oneline_comment_style
231 ):
232 lineno = src_comment.node.start_point.row + row_offset + 1
233 remote_url = self.git_remote_url
234 if self.git_remote_url and self.git_commit_rev:
235 remote_url = utils.form_https_url(
236 self.git_remote_url,
237 self.git_commit_rev,
238 self.project_path,
239 filepath,
240 lineno,
241 )
242 source_map: SourceMap = {
243 "start": {
244 "row": lineno - 1,
245 "column": resolved["start_column"], # type: ignore[typeddict-item] # dynamic keys
246 },
247 "end": {
248 "row": lineno - 1,
249 "column": resolved["end_column"], # type: ignore[typeddict-item] # dynamic keys
250 },
251 }
252 del resolved["start_column"]
253 del resolved["end_column"]
254 oneline_needs.append(
255 OneLineNeed(
256 filepath,
257 remote_url,
258 source_map,
259 src_comment,
260 tagged_scope,
261 resolved, # type: ignore[arg-type] # int arguments were deleted
262 )
263 )
264 return oneline_needs
265
[docs]266 # @Extract marked reStructuredText blocks from comments, IMPL_MRST_1, impl, [FE_RST_EXTRACTION]
267 def extract_marked_rst(
268 self,
269 text: str,
270 filepath: Path,
271 tagged_scope: TreeSitterNode | None,
272 src_comment: SourceComment,
273 ) -> MarkedRst | None:
274 """Extract marked rst from a comment.
275
276 Presumably, only one marked rst text in a comment.
277 """
278 extracted_rst = utils.extract_rst(
279 text,
280 self.analyse_config.marked_rst_config.start_sequence,
281 self.analyse_config.marked_rst_config.end_sequence,
282 )
283 if not extracted_rst:
284 return None
285 if UNIX_NEWLINE in extracted_rst["rst_text"]:
286 rst_text = utils.remove_leading_sequences(extracted_rst["rst_text"], ["*"])
287 else:
288 rst_text = extracted_rst["rst_text"]
289 lineno = src_comment.node.start_point.row + extracted_rst["row_offset"] + 1
290 remote_url = self.git_remote_url
291 if self.git_remote_url and self.git_commit_rev:
292 remote_url = utils.form_https_url(
293 self.git_remote_url,
294 self.git_commit_rev,
295 self.project_path,
296 filepath,
297 lineno,
298 )
299 source_map: SourceMap = {
300 "start": {
301 "row": lineno - 1,
302 "column": extracted_rst["start_idx"],
303 },
304 "end": {
305 "row": lineno - 1,
306 "column": extracted_rst["end_idx"],
307 },
308 }
309 return MarkedRst(
310 filepath,
311 remote_url,
312 source_map,
313 src_comment,
314 tagged_scope,
315 rst_text,
316 )
317
318 def extract_marked_content(self) -> None:
319 for src_comment in self.src_comments:
320 text = (
321 src_comment.node.text.decode("utf-8") if src_comment.node.text else None
322 )
323 if not text:
324 continue
325 filepath = (
326 src_comment.source_file.filepath if src_comment.source_file else None
327 )
328 if not filepath:
329 continue
330 tagged_scope: TreeSitterNode | None = utils.find_associated_scope(
331 src_comment.node, self.analyse_config.comment_type
332 )
333 if self.analyse_config.get_need_id_refs:
334 anchors = self.extract_anchors(
335 text, filepath, tagged_scope, src_comment
336 )
337 self.need_id_refs.extend(anchors)
338
339 if self.analyse_config.get_oneline_needs:
340 oneline_needs = self.extract_oneline_needs(
341 text,
342 filepath,
343 tagged_scope,
344 src_comment,
345 self.analyse_config.oneline_comment_style,
346 )
347 self.oneline_needs.extend(oneline_needs)
348 if self.analyse_config.get_rst:
349 marked_rst = self.extract_marked_rst(
350 text, filepath, tagged_scope, src_comment
351 )
352 if marked_rst:
353 self.marked_rst.append(marked_rst)
354
355 def merge_marked_content(self) -> None:
356 self.all_marked_content.extend(self.need_id_refs)
357 self.oneline_needs.sort(key=lambda x: x.source_map["start"]["row"])
358 self.all_marked_content.extend(self.oneline_needs)
359 self.all_marked_content.extend(self.marked_rst)
360 self.all_marked_content.sort(
361 key=lambda x: (x.filepath, x.source_map["start"]["row"])
362 )
363
364 def dump_marked_content(self, outdir: Path) -> None:
365 output_path = outdir / "marked_content.json"
366 if not output_path.parent.exists():
367 output_path.parent.mkdir(parents=True)
368 to_dump = [
369 marked_content.to_dict() for marked_content in self.all_marked_content
370 ]
371 with output_path.open("w") as f:
372 json.dump(to_dump, f)
373
374 def run(self) -> None:
375 self.create_src_objects()
376 self.extract_marked_content()
377 self.merge_marked_content()
378 self._log_summary()
379
380 def _log_summary(self) -> None:
381 """Emit a per-project marker (default-visible) plus a -v breakdown."""
382 label = f"codelinks [{self.name}]" if self.name else "codelinks"
383 logger.info(
384 f"{label}: {_count(len(self.src_files), 'file')}, "
385 f"{_count(len(self.all_marked_content), 'marker')}"
386 )
387 logger.debug(
388 f"{label}: {_count(len(self.src_comments), 'comment')}, "
389 f"{_count(len(self.oneline_needs), 'oneline need')}, "
390 f"{_count(len(self.need_id_refs), 'id-ref')}, "
391 f"{_count(len(self.marked_rst), 'marked-rst block')}"
392 )