1from collections.abc import Generator
  2from dataclasses import dataclass
  3import json
  4from pathlib import Path
  5from typing import Any, TypedDict
  6
  7from tree_sitter import Node as TreeSitterNode
  8
  9from sphinx_codelinks.analyse import utils
 10from sphinx_codelinks.analyse.models import (
 11    MarkedContentType,
 12    MarkedRst,
 13    NeedIdRefs,
 14    OneLineNeed,
 15    SourceComment,
 16    SourceFile,
 17    SourceMap,
 18)
 19from sphinx_codelinks.analyse.oneline_parser import (
 20    OnelineParserInvalidWarning,
 21    oneline_parser,
 22)
 23from sphinx_codelinks.config import (
 24    UNIX_NEWLINE,
 25    OneLineCommentStyle,
 26    SourceAnalyseConfig,
 27)
 28from sphinx_codelinks.logger import get_logger
 29
 30logger = get_logger(__name__)
 31
 32
 33def _count(n: int, noun: str) -> str:
 34    """Format ``n noun`` with a naive (append-s) plural for progress summaries."""
 35    return f"{n} {noun}" if n == 1 else f"{n} {noun}s"
 36
 37
 38class AnalyseWarningType(TypedDict):
 39    file_path: str
 40    lineno: int
 41    msg: str
 42    type: str
 43    sub_type: str
 44
 45
 46@dataclass
 47class AnalyseWarning:
 48    file_path: str
 49    lineno: int
 50    msg: str
 51    type: str
 52    sub_type: str
 53
 54
 55class SourceAnalyse:
 56    def __init__(
 57        self,
 58        analyse_config: SourceAnalyseConfig,
 59        *,
 60        name: str = "",
 61    ) -> None:
 62        self.name = name
 63        self.analyse_config = analyse_config
 64        self.src_files: list[SourceFile] = []
 65        self.src_comments: list[SourceComment] = []
 66        self.need_id_refs: list[NeedIdRefs] = []
 67        self.oneline_needs: list[OneLineNeed] = []
 68        self.marked_rst: list[MarkedRst] = []
 69        self.all_marked_content: list[NeedIdRefs | OneLineNeed | MarkedRst] = []
 70        # Use explicitly configured git_root if provided, otherwise auto-detect
 71        if self.analyse_config.git_root is not None:
 72            self.git_root: Path | None = self.analyse_config.git_root.resolve()
 73        else:
 74            self.git_root = utils.locate_git_root(self.analyse_config.src_dir)
 75        self.git_remote_url: str | None = (
 76            utils.get_remote_url(self.git_root) if self.git_root else None
 77        )
 78        self.git_commit_rev: str | None = (
 79            utils.get_current_rev(self.git_root) if self.git_root else None
 80        )
 81        self.project_path: Path = (
 82            self.git_root if self.git_root else self.analyse_config.src_dir
 83        )
 84        self.oneline_warnings: list[AnalyseWarning] = []
 85
 86    def get_src_strings(self) -> Generator[tuple[Path, bytes], Any, None]:  # type: ignore[explicit-any]
 87        """Load source files and extract their content."""
 88        for src_path in self.analyse_config.src_files:
 89            if not utils.is_text_file(src_path):
 90                continue
 91            with src_path.open("r", encoding="utf-8", newline="") as f:
 92                # Normalize all line endings to Unix LF
 93                text = f.read()
 94            text = text.replace("\r\n", "\n").replace("\r", "\n")
 95            yield src_path, text.encode("utf-8")
 96
 97    def create_src_objects(self) -> None:
 98        parser, query = utils.init_tree_sitter(self.analyse_config.comment_type)
 99
100        for src_path, src_string in self.get_src_strings():
101            comments: list[TreeSitterNode] | None = utils.extract_comments(
102                src_string, parser, query
103            )
104            if not comments:
105                continue
106            src_comments: list[SourceComment] = [
107                SourceComment(node) for node in comments
108            ]
109
110            src_file = SourceFile(src_path.absolute())
111            src_file.add_comments(src_comments)
112            self.src_files.append(src_file)
113            self.src_comments.extend(src_comments)
114
115    def extract_marker(
116        self,
117        text: str,
118    ) -> Generator[tuple[str, list[str], int, int, int], None, None]:
119        lines = text.splitlines()
120        row_offset = 0
121        for line in lines:
122            for marker in self.analyse_config.need_id_refs_config.markers:
123                marker_idx = line.find(marker)
124                if marker_idx == -1:
125                    continue
126                markered_text = line[marker_idx + len(marker) :].strip()
127                need_ids = markered_text.replace(",", " ").split()
128                start_column = marker_idx + len(marker)
129                end_column = start_column + len(markered_text)
130                yield marker, need_ids, row_offset, start_column, end_column
131            row_offset += 1
132
[docs]133    # @Extract need ID references from code comments, IMPL_LNK_1, impl, [FE_LNK]
134    def extract_anchors(
135        self,
136        text: str,
137        filepath: Path,
138        tagged_scope: TreeSitterNode | None,
139        src_comment: SourceComment,
140    ) -> list[NeedIdRefs]:
141        """Extract need-ids-refs from a comment."""
142        anchors: list[NeedIdRefs] = []
143        for (
144            marker,
145            need_ids,
146            row_offset,
147            start_column,
148            end_column,
149        ) in self.extract_marker(text):
150            lineno = src_comment.node.start_point.row + row_offset + 1
151            remote_url = self.git_remote_url
152            if self.git_remote_url and self.git_commit_rev:
153                remote_url = utils.form_https_url(
154                    self.git_remote_url,
155                    self.git_commit_rev,
156                    self.project_path,
157                    filepath,
158                    lineno,
159                )
160            source_map: SourceMap = {
161                "start": {
162                    "row": lineno - 1,
163                    "column": start_column,
164                },
165                "end": {
166                    "row": lineno - 1,
167                    "column": end_column,
168                },
169            }
170            anchors.append(
171                NeedIdRefs(
172                    filepath,
173                    remote_url,
174                    source_map,
175                    src_comment,
176                    tagged_scope,
177                    need_ids,
178                    marker,
179                )
180            )
181        return anchors
182
183    def extract_oneline_need(
184        self,
185        text: str,
186        src_comment: SourceComment,
187        oneline_comment_style: OneLineCommentStyle,
188    ) -> Generator[tuple[dict[str, str | list[str] | int], int]]:
189        lines = text.splitlines(keepends=True)
190        row_offset = 0
191        if len(lines) == 1:
192            # single line comment has no newline char in the extracted comment
193            lines[0] = f"{lines[0]}{UNIX_NEWLINE}"
194
195        for line in lines:
196            resolved = oneline_parser(line, oneline_comment_style)
197            if not resolved:
198                row_offset += 1
199                continue
200            if isinstance(resolved, OnelineParserInvalidWarning):
201                if not src_comment.source_file:
202                    row_offset += 1
203                    continue
204                lineno = src_comment.node.start_point.row + row_offset + 1
205                warning = AnalyseWarning(
206                    str(src_comment.source_file.filepath),
207                    lineno,
208                    resolved.msg,
209                    MarkedContentType.need,
210                    resolved.sub_type.value,
211                )
212                self.oneline_warnings.append(warning)
213                row_offset += 1
214                continue
215            yield resolved, row_offset
216            row_offset += 1
217
[docs]218    # @Extract one-line traceability needs from comments, IMPL_ONE_1, impl, [FE_DEF, FE_CMT]
219    def extract_oneline_needs(
220        self,
221        text: str,
222        filepath: Path,
223        tagged_scope: TreeSitterNode | None,
224        src_comment: SourceComment,
225        oneline_comment_style: OneLineCommentStyle,
226    ) -> list[OneLineNeed]:
227        row_offset = 0
228        oneline_needs = []
229        for resolved, row_offset in self.extract_oneline_need(
230            text, src_comment, oneline_comment_style
231        ):
232            lineno = src_comment.node.start_point.row + row_offset + 1
233            remote_url = self.git_remote_url
234            if self.git_remote_url and self.git_commit_rev:
235                remote_url = utils.form_https_url(
236                    self.git_remote_url,
237                    self.git_commit_rev,
238                    self.project_path,
239                    filepath,
240                    lineno,
241                )
242            source_map: SourceMap = {
243                "start": {
244                    "row": lineno - 1,
245                    "column": resolved["start_column"],  # type: ignore[typeddict-item]  # dynamic keys
246                },
247                "end": {
248                    "row": lineno - 1,
249                    "column": resolved["end_column"],  # type: ignore[typeddict-item]  # dynamic keys
250                },
251            }
252            del resolved["start_column"]
253            del resolved["end_column"]
254            oneline_needs.append(
255                OneLineNeed(
256                    filepath,
257                    remote_url,
258                    source_map,
259                    src_comment,
260                    tagged_scope,
261                    resolved,  # type: ignore[arg-type] # int arguments were deleted
262                )
263            )
264        return oneline_needs
265
[docs]266    # @Extract marked reStructuredText blocks from comments, IMPL_MRST_1, impl, [FE_RST_EXTRACTION]
267    def extract_marked_rst(
268        self,
269        text: str,
270        filepath: Path,
271        tagged_scope: TreeSitterNode | None,
272        src_comment: SourceComment,
273    ) -> MarkedRst | None:
274        """Extract marked rst from a comment.
275
276        Presumably, only one marked rst text in a comment.
277        """
278        extracted_rst = utils.extract_rst(
279            text,
280            self.analyse_config.marked_rst_config.start_sequence,
281            self.analyse_config.marked_rst_config.end_sequence,
282        )
283        if not extracted_rst:
284            return None
285        if UNIX_NEWLINE in extracted_rst["rst_text"]:
286            rst_text = utils.remove_leading_sequences(extracted_rst["rst_text"], ["*"])
287        else:
288            rst_text = extracted_rst["rst_text"]
289        lineno = src_comment.node.start_point.row + extracted_rst["row_offset"] + 1
290        remote_url = self.git_remote_url
291        if self.git_remote_url and self.git_commit_rev:
292            remote_url = utils.form_https_url(
293                self.git_remote_url,
294                self.git_commit_rev,
295                self.project_path,
296                filepath,
297                lineno,
298            )
299        source_map: SourceMap = {
300            "start": {
301                "row": lineno - 1,
302                "column": extracted_rst["start_idx"],
303            },
304            "end": {
305                "row": lineno - 1,
306                "column": extracted_rst["end_idx"],
307            },
308        }
309        return MarkedRst(
310            filepath,
311            remote_url,
312            source_map,
313            src_comment,
314            tagged_scope,
315            rst_text,
316        )
317
318    def extract_marked_content(self) -> None:
319        for src_comment in self.src_comments:
320            text = (
321                src_comment.node.text.decode("utf-8") if src_comment.node.text else None
322            )
323            if not text:
324                continue
325            filepath = (
326                src_comment.source_file.filepath if src_comment.source_file else None
327            )
328            if not filepath:
329                continue
330            tagged_scope: TreeSitterNode | None = utils.find_associated_scope(
331                src_comment.node, self.analyse_config.comment_type
332            )
333            if self.analyse_config.get_need_id_refs:
334                anchors = self.extract_anchors(
335                    text, filepath, tagged_scope, src_comment
336                )
337                self.need_id_refs.extend(anchors)
338
339            if self.analyse_config.get_oneline_needs:
340                oneline_needs = self.extract_oneline_needs(
341                    text,
342                    filepath,
343                    tagged_scope,
344                    src_comment,
345                    self.analyse_config.oneline_comment_style,
346                )
347                self.oneline_needs.extend(oneline_needs)
348            if self.analyse_config.get_rst:
349                marked_rst = self.extract_marked_rst(
350                    text, filepath, tagged_scope, src_comment
351                )
352                if marked_rst:
353                    self.marked_rst.append(marked_rst)
354
355    def merge_marked_content(self) -> None:
356        self.all_marked_content.extend(self.need_id_refs)
357        self.oneline_needs.sort(key=lambda x: x.source_map["start"]["row"])
358        self.all_marked_content.extend(self.oneline_needs)
359        self.all_marked_content.extend(self.marked_rst)
360        self.all_marked_content.sort(
361            key=lambda x: (x.filepath, x.source_map["start"]["row"])
362        )
363
364    def dump_marked_content(self, outdir: Path) -> None:
365        output_path = outdir / "marked_content.json"
366        if not output_path.parent.exists():
367            output_path.parent.mkdir(parents=True)
368        to_dump = [
369            marked_content.to_dict() for marked_content in self.all_marked_content
370        ]
371        with output_path.open("w") as f:
372            json.dump(to_dump, f)
373
374    def run(self) -> None:
375        self.create_src_objects()
376        self.extract_marked_content()
377        self.merge_marked_content()
378        self._log_summary()
379
380    def _log_summary(self) -> None:
381        """Emit a per-project marker (default-visible) plus a -v breakdown."""
382        label = f"codelinks [{self.name}]" if self.name else "codelinks"
383        logger.info(
384            f"{label}: {_count(len(self.src_files), 'file')}, "
385            f"{_count(len(self.all_marked_content), 'marker')}"
386        )
387        logger.debug(
388            f"{label}: {_count(len(self.src_comments), 'comment')}, "
389            f"{_count(len(self.oneline_needs), 'oneline need')}, "
390            f"{_count(len(self.need_id_refs), 'id-ref')}, "
391            f"{_count(len(self.marked_rst), 'marked-rst block')}"
392        )