1from collections.abc import Generator
  2from dataclasses import dataclass
  3import json
  4import logging
  5from pathlib import Path
  6from typing import Any, TypedDict
  7
  8from tree_sitter import Node as TreeSitterNode
  9
 10from sphinx_codelinks.analyse import utils
 11from sphinx_codelinks.analyse.models import (
 12    MarkedContentType,
 13    MarkedRst,
 14    NeedIdRefs,
 15    OneLineNeed,
 16    SourceComment,
 17    SourceFile,
 18    SourceMap,
 19)
 20from sphinx_codelinks.analyse.oneline_parser import (
 21    OnelineParserInvalidWarning,
 22    oneline_parser,
 23)
 24from sphinx_codelinks.config import (
 25    UNIX_NEWLINE,
 26    OneLineCommentStyle,
 27    SourceAnalyseConfig,
 28)
 29
 30# initialize logger
 31logger = logging.getLogger(__name__)
 32logger.setLevel(logging.INFO)
 33# log to the console
 34console = logging.StreamHandler()
 35console.setLevel(logging.INFO)
 36logger.addHandler(console)
 37
 38
 39class AnalyseWarningType(TypedDict):
 40    file_path: str
 41    lineno: int
 42    msg: str
 43    type: str
 44    sub_type: str
 45
 46
 47@dataclass
 48class AnalyseWarning:
 49    file_path: str
 50    lineno: int
 51    msg: str
 52    type: str
 53    sub_type: str
 54
 55
 56class SourceAnalyse:
 57    def __init__(
 58        self,
 59        analyse_config: SourceAnalyseConfig,
 60    ) -> None:
 61        self.analyse_config = analyse_config
 62        self.src_files: list[SourceFile] = []
 63        self.src_comments: list[SourceComment] = []
 64        self.need_id_refs: list[NeedIdRefs] = []
 65        self.oneline_needs: list[OneLineNeed] = []
 66        self.marked_rst: list[MarkedRst] = []
 67        self.all_marked_content: list[NeedIdRefs | OneLineNeed | MarkedRst] = []
 68        # Use explicitly configured git_root if provided, otherwise auto-detect
 69        if self.analyse_config.git_root is not None:
 70            self.git_root: Path | None = self.analyse_config.git_root.resolve()
 71        else:
 72            self.git_root = utils.locate_git_root(self.analyse_config.src_dir)
 73        self.git_remote_url: str | None = (
 74            utils.get_remote_url(self.git_root) if self.git_root else None
 75        )
 76        self.git_commit_rev: str | None = (
 77            utils.get_current_rev(self.git_root) if self.git_root else None
 78        )
 79        self.project_path: Path = (
 80            self.git_root if self.git_root else self.analyse_config.src_dir
 81        )
 82        self.oneline_warnings: list[AnalyseWarning] = []
 83
 84    def get_src_strings(self) -> Generator[tuple[Path, bytes], Any, None]:  # type: ignore[explicit-any]
 85        """Load source files and extract their content."""
 86        for src_path in self.analyse_config.src_files:
 87            if not utils.is_text_file(src_path):
 88                continue
 89            with src_path.open("r", encoding="utf-8", newline="") as f:
 90                # Normalize all line endings to Unix LF
 91                text = f.read()
 92            text = text.replace("\r\n", "\n").replace("\r", "\n")
 93            yield src_path, text.encode("utf-8")
 94
 95    def create_src_objects(self) -> None:
 96        parser, query = utils.init_tree_sitter(self.analyse_config.comment_type)
 97
 98        for src_path, src_string in self.get_src_strings():
 99            comments: list[TreeSitterNode] | None = utils.extract_comments(
100                src_string, parser, query
101            )
102            if not comments:
103                continue
104            src_comments: list[SourceComment] = [
105                SourceComment(node) for node in comments
106            ]
107
108            src_file = SourceFile(src_path.absolute())
109            src_file.add_comments(src_comments)
110            self.src_files.append(src_file)
111            self.src_comments.extend(src_comments)
112
113        logger.info(f"Source files loaded: {len(self.src_files)}")
114        logger.info(f"Source comments extracted: {len(self.src_comments)}")
115
116    def extract_marker(
117        self,
118        text: str,
119    ) -> Generator[tuple[str, list[str], int, int, int], None, None]:
120        lines = text.splitlines()
121        row_offset = 0
122        for line in lines:
123            for marker in self.analyse_config.need_id_refs_config.markers:
124                marker_idx = line.find(marker)
125                if marker_idx == -1:
126                    continue
127                markered_text = line[marker_idx + len(marker) :].strip()
128                need_ids = markered_text.replace(",", " ").split()
129                start_column = marker_idx + len(marker)
130                end_column = start_column + len(markered_text)
131                yield marker, need_ids, row_offset, start_column, end_column
132            row_offset += 1
133
[docs]134    # @Extract need ID references from code comments, IMPL_LNK_1, impl, [FE_LNK]
135    def extract_anchors(
136        self,
137        text: str,
138        filepath: Path,
139        tagged_scope: TreeSitterNode | None,
140        src_comment: SourceComment,
141    ) -> list[NeedIdRefs]:
142        """Extract need-ids-refs from a comment."""
143        anchors: list[NeedIdRefs] = []
144        for (
145            marker,
146            need_ids,
147            row_offset,
148            start_column,
149            end_column,
150        ) in self.extract_marker(text):
151            lineno = src_comment.node.start_point.row + row_offset + 1
152            remote_url = self.git_remote_url
153            if self.git_remote_url and self.git_commit_rev:
154                remote_url = utils.form_https_url(
155                    self.git_remote_url,
156                    self.git_commit_rev,
157                    self.project_path,
158                    filepath,
159                    lineno,
160                )
161            source_map: SourceMap = {
162                "start": {
163                    "row": lineno - 1,
164                    "column": start_column,
165                },
166                "end": {
167                    "row": lineno - 1,
168                    "column": end_column,
169                },
170            }
171            anchors.append(
172                NeedIdRefs(
173                    filepath,
174                    remote_url,
175                    source_map,
176                    src_comment,
177                    tagged_scope,
178                    need_ids,
179                    marker,
180                )
181            )
182        return anchors
183
184    def extract_oneline_need(
185        self,
186        text: str,
187        src_comment: SourceComment,
188        oneline_comment_style: OneLineCommentStyle,
189    ) -> Generator[tuple[dict[str, str | list[str] | int], int]]:
190        lines = text.splitlines(keepends=True)
191        row_offset = 0
192        if len(lines) == 1:
193            # single line comment has no newline char in the extracted comment
194            lines[0] = f"{lines[0]}{UNIX_NEWLINE}"
195
196        for line in lines:
197            resolved = oneline_parser(line, oneline_comment_style)
198            if not resolved:
199                row_offset += 1
200                continue
201            if isinstance(resolved, OnelineParserInvalidWarning):
202                if not src_comment.source_file:
203                    row_offset += 1
204                    continue
205                lineno = src_comment.node.start_point.row + row_offset + 1
206                warning = AnalyseWarning(
207                    str(src_comment.source_file.filepath),
208                    lineno,
209                    resolved.msg,
210                    MarkedContentType.need,
211                    resolved.sub_type.value,
212                )
213                self.oneline_warnings.append(warning)
214                row_offset += 1
215                continue
216            yield resolved, row_offset
217            row_offset += 1
218
[docs]219    # @Extract one-line traceability needs from comments, IMPL_ONE_1, impl, [FE_DEF, FE_CMT]
220    def extract_oneline_needs(
221        self,
222        text: str,
223        filepath: Path,
224        tagged_scope: TreeSitterNode | None,
225        src_comment: SourceComment,
226        oneline_comment_style: OneLineCommentStyle,
227    ) -> list[OneLineNeed]:
228        row_offset = 0
229        oneline_needs = []
230        for resolved, row_offset in self.extract_oneline_need(
231            text, src_comment, oneline_comment_style
232        ):
233            lineno = src_comment.node.start_point.row + row_offset + 1
234            remote_url = self.git_remote_url
235            if self.git_remote_url and self.git_commit_rev:
236                remote_url = utils.form_https_url(
237                    self.git_remote_url,
238                    self.git_commit_rev,
239                    self.project_path,
240                    filepath,
241                    lineno,
242                )
243            source_map: SourceMap = {
244                "start": {
245                    "row": lineno - 1,
246                    "column": resolved["start_column"],  # type: ignore[typeddict-item]  # dynamic keys
247                },
248                "end": {
249                    "row": lineno - 1,
250                    "column": resolved["end_column"],  # type: ignore[typeddict-item]  # dynamic keys
251                },
252            }
253            del resolved["start_column"]
254            del resolved["end_column"]
255            oneline_needs.append(
256                OneLineNeed(
257                    filepath,
258                    remote_url,
259                    source_map,
260                    src_comment,
261                    tagged_scope,
262                    resolved,  # type: ignore[arg-type] # int arguments were deleted
263                )
264            )
265        return oneline_needs
266
[docs]267    # @Extract marked reStructuredText blocks from comments, IMPL_MRST_1, impl, [FE_RST_EXTRACTION]
268    def extract_marked_rst(
269        self,
270        text: str,
271        filepath: Path,
272        tagged_scope: TreeSitterNode | None,
273        src_comment: SourceComment,
274    ) -> MarkedRst | None:
275        """Extract marked rst from a comment.
276
277        Presumably, only one marked rst text in a comment.
278        """
279        extracted_rst = utils.extract_rst(
280            text,
281            self.analyse_config.marked_rst_config.start_sequence,
282            self.analyse_config.marked_rst_config.end_sequence,
283        )
284        if not extracted_rst:
285            return None
286        if UNIX_NEWLINE in extracted_rst["rst_text"]:
287            rst_text = utils.remove_leading_sequences(extracted_rst["rst_text"], ["*"])
288        else:
289            rst_text = extracted_rst["rst_text"]
290        lineno = src_comment.node.start_point.row + extracted_rst["row_offset"] + 1
291        remote_url = self.git_remote_url
292        if self.git_remote_url and self.git_commit_rev:
293            remote_url = utils.form_https_url(
294                self.git_remote_url,
295                self.git_commit_rev,
296                self.project_path,
297                filepath,
298                lineno,
299            )
300        source_map: SourceMap = {
301            "start": {
302                "row": lineno - 1,
303                "column": extracted_rst["start_idx"],
304            },
305            "end": {
306                "row": lineno - 1,
307                "column": extracted_rst["end_idx"],
308            },
309        }
310        return MarkedRst(
311            filepath,
312            remote_url,
313            source_map,
314            src_comment,
315            tagged_scope,
316            rst_text,
317        )
318
319    def extract_marked_content(self) -> None:
320        for src_comment in self.src_comments:
321            text = (
322                src_comment.node.text.decode("utf-8") if src_comment.node.text else None
323            )
324            if not text:
325                continue
326            filepath = (
327                src_comment.source_file.filepath if src_comment.source_file else None
328            )
329            if not filepath:
330                continue
331            tagged_scope: TreeSitterNode | None = utils.find_associated_scope(
332                src_comment.node, self.analyse_config.comment_type
333            )
334            if self.analyse_config.get_need_id_refs:
335                anchors = self.extract_anchors(
336                    text, filepath, tagged_scope, src_comment
337                )
338                self.need_id_refs.extend(anchors)
339
340            if self.analyse_config.get_oneline_needs:
341                oneline_needs = self.extract_oneline_needs(
342                    text,
343                    filepath,
344                    tagged_scope,
345                    src_comment,
346                    self.analyse_config.oneline_comment_style,
347                )
348                self.oneline_needs.extend(oneline_needs)
349            if self.analyse_config.get_rst:
350                marked_rst = self.extract_marked_rst(
351                    text, filepath, tagged_scope, src_comment
352                )
353                if marked_rst:
354                    self.marked_rst.append(marked_rst)
355
356        if self.analyse_config.get_need_id_refs:
357            logger.info(f"Need-id-refs extracted: {len(self.need_id_refs)}")
358        if self.analyse_config.get_oneline_needs:
359            logger.info(f"Oneline needs extracted: {len(self.oneline_needs)}")
360        if self.analyse_config.get_rst:
361            logger.info(f"Marked rst extracted: {len(self.marked_rst)}")
362
363    def merge_marked_content(self) -> None:
364        self.all_marked_content.extend(self.need_id_refs)
365        self.oneline_needs.sort(key=lambda x: x.source_map["start"]["row"])
366        self.all_marked_content.extend(self.oneline_needs)
367        self.all_marked_content.extend(self.marked_rst)
368        self.all_marked_content.sort(
369            key=lambda x: (x.filepath, x.source_map["start"]["row"])
370        )
371
372    def dump_marked_content(self, outdir: Path) -> None:
373        output_path = outdir / "marked_content.json"
374        if not output_path.parent.exists():
375            output_path.parent.mkdir(parents=True)
376        to_dump = [
377            marked_content.to_dict() for marked_content in self.all_marked_content
378        ]
379        with output_path.open("w") as f:
380            json.dump(to_dump, f)
381        logger.info(f"Marked content dumped to {output_path}")
382
383    def run(self) -> None:
384        self.create_src_objects()
385        self.extract_marked_content()
386        self.merge_marked_content()