Skip to content

vllm.parser

Modules:

Name Description
abstract_parser
minimax_m2_parser

MiniMax M2 Parser - A unified parser for MiniMax M2 models.

parser_manager

DelegatingParser

Bases: Parser

A Parser implementation that delegates to separate ReasoningParser and ToolParser instances.

This is the recommended base class for creating model-specific parsers that combine existing reasoning and tool parser implementations. Subclasses should set self._reasoning_parser and self._tool_parser in their __init__ method.

If either parser is None, the corresponding methods will return default values (no reasoning extraction, no tool calls).

Source code in vllm/parser/abstract_parser.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
class DelegatingParser(Parser):
    """
    A Parser implementation that delegates to separate ReasoningParser and
    ToolParser instances.

    This is the recommended base class for creating model-specific parsers
    that combine existing reasoning and tool parser implementations.
    Subclasses should set `self._reasoning_parser` and `self._tool_parser`
    in their `__init__` method.

    If either parser is None, the corresponding methods will return default
    values (no reasoning extraction, no tool calls).
    """

    def extract_reasoning(
        self,
        model_output: str,
        request: ChatCompletionRequest | ResponsesRequest,
    ) -> tuple[str | None, str | None]:
        if self._reasoning_parser is None:
            return None, model_output
        return self._reasoning_parser.extract_reasoning(model_output, request)

    def extract_response_outputs(
        self,
        *,
        model_output: str,
        model_output_token_ids: Sequence[int],
        request: ResponsesRequest,
        enable_auto_tools: bool = False,
        tool_call_id_type: str = "random",
        logprobs: list[Logprob] | None = None,
    ) -> list[ResponseOutputItem]:
        # First extract reasoning
        reasoning, content = self.extract_reasoning(model_output, request)

        # Then parse tool calls from the content
        tool_calls, content = self._parse_tool_calls(
            request=request,
            content=content,
            enable_auto_tools=enable_auto_tools,
        )

        # Build output items
        outputs: list[ResponseOutputItem] = []

        # Add reasoning item if present
        if reasoning:
            reasoning_item = ResponseReasoningItem(
                id=f"rs_{random_uuid()}",
                summary=[],
                type="reasoning",
                content=[
                    ResponseReasoningTextContent(text=reasoning, type="reasoning_text")
                ],
                status=None,  # NOTE: Only the last output item has status.
            )
            outputs.append(reasoning_item)

        # Add message item if there's content
        if content:
            res_text_part = ResponseOutputText(
                text=content,
                annotations=[],
                type="output_text",
                logprobs=logprobs,
            )
            message_item = ResponseOutputMessage(
                id=f"msg_{random_uuid()}",
                content=[res_text_part],
                role="assistant",
                status="completed",
                type="message",
            )
            outputs.append(message_item)

        if tool_calls:
            # We use a simple counter for history_tool_call_count because
            # we don't track the history of tool calls in the Responses API yet.
            # This means that the tool call index will start from 0 for each
            # request.
            for history_tool_call_cnt, tool_call in enumerate(tool_calls):
                tool_call_item = ResponseFunctionToolCall(
                    id=f"fc_{random_uuid()}",
                    call_id=tool_call.id
                    if tool_call.id
                    else make_tool_call_id(
                        id_type=tool_call_id_type,
                        func_name=tool_call.name,
                        idx=history_tool_call_cnt,
                    ),
                    type="function_call",
                    status="completed",
                    name=tool_call.name,
                    arguments=tool_call.arguments,
                )
                outputs.append(tool_call_item)

        return outputs

    def _get_function_name(
        self, request: ChatCompletionRequest | ResponsesRequest
    ) -> str:
        if request.tool_choice and isinstance(request.tool_choice, ToolChoiceFunction):
            return request.tool_choice.name
        if request.tool_choice and isinstance(
            request.tool_choice, ChatCompletionNamedToolChoiceParam
        ):
            return request.tool_choice.function.name
        raise ValueError("Invalid tool_choice for function name extraction.")

    def _parse_tool_calls(
        self,
        request: ResponsesRequest,
        content: str | None,
        enable_auto_tools: bool,
    ) -> tuple[list[FunctionCall], str | None]:
        """
        TODO(qandrew): merge _parse_tool_calls_from_content
        for ChatCompletions into this function
        Parse tool calls from content based on request tool_choice settings.

        Returns:
            A tuple of (function_calls, remaining_content) if tool calls
            were parsed
        """
        function_calls: list[FunctionCall] = []

        if request.tool_choice and isinstance(
            request.tool_choice,
            (ToolChoiceFunction, ChatCompletionNamedToolChoiceParam),
        ):
            # Forced Function Call
            assert content is not None
            function_calls.append(
                FunctionCall(name=self._get_function_name(request), arguments=content)
            )
            return function_calls, None  # Clear content since tool is called.

        if request.tool_choice == "required":
            # Required tool calls - parse JSON
            tool_calls = []
            with contextlib.suppress(ValidationError):
                content = content or ""
                tool_calls = TypeAdapter(list[FunctionDefinition]).validate_json(
                    content
                )
            for tool_call in tool_calls:
                function_calls.append(
                    FunctionCall(
                        name=tool_call.name,
                        arguments=json.dumps(tool_call.parameters, ensure_ascii=False),
                    )
                )
            return function_calls, None  # Clear content since tool is called.

        if (
            self._tool_parser is not None
            and enable_auto_tools
            and (request.tool_choice == "auto" or request.tool_choice is None)
        ):
            # Automatic Tool Call Parsing
            tool_call_info = self._tool_parser.extract_tool_calls(
                content if content is not None else "",
                request=request,  # type: ignore
            )
            if tool_call_info is not None and tool_call_info.tools_called:
                function_calls.extend(
                    FunctionCall(
                        id=tool_call.id,
                        name=tool_call.function.name,
                        arguments=tool_call.function.arguments,
                    )
                    for tool_call in tool_call_info.tool_calls
                )
                remaining_content = tool_call_info.content
                if remaining_content and remaining_content.strip() == "":
                    remaining_content = None
                return function_calls, remaining_content

        # No tool calls
        return [], content

    def adjust_request(
        self, request: ChatCompletionRequest | ResponsesRequest
    ) -> ChatCompletionRequest | ResponsesRequest:
        if self._reasoning_parser is not None:
            request = self._reasoning_parser.adjust_request(request)
        if self._tool_parser is not None:
            request = self._tool_parser.adjust_request(request)
        return request

    def extract_reasoning_streaming(
        self,
        previous_text: str,
        current_text: str,
        delta_text: str,
        previous_token_ids: Sequence[int],
        current_token_ids: Sequence[int],
        delta_token_ids: Sequence[int],
    ) -> DeltaMessage | None:
        if self._reasoning_parser is None:
            return DeltaMessage(content=delta_text)
        return self._reasoning_parser.extract_reasoning_streaming(
            previous_text,
            current_text,
            delta_text,
            previous_token_ids,
            current_token_ids,
            delta_token_ids,
        )

    def extract_tool_calls(
        self,
        model_output: str,
        request: ChatCompletionRequest,
    ) -> ExtractedToolCallInformation:
        if self._tool_parser is None:
            return ExtractedToolCallInformation(
                tools_called=False, tool_calls=[], content=model_output
            )
        return self._tool_parser.extract_tool_calls(model_output, request)

    def extract_tool_calls_streaming(
        self,
        previous_text: str,
        current_text: str,
        delta_text: str,
        previous_token_ids: Sequence[int],
        current_token_ids: Sequence[int],
        delta_token_ids: Sequence[int],
        request: ChatCompletionRequest,
    ) -> DeltaMessage | None:
        if self._tool_parser is None:
            return None
        return self._tool_parser.extract_tool_calls_streaming(
            previous_text,
            current_text,
            delta_text,
            previous_token_ids,
            current_token_ids,
            delta_token_ids,
            request,
        )

    def _extract_tool_calls_streaming(
        self,
        previous_text: str,
        current_text: str,
        delta_text: str,
        previous_token_ids: Sequence[int],
        current_token_ids: Sequence[int],
        delta_token_ids: Sequence[int],
        request: ChatCompletionRequest | ResponsesRequest,
        # The following parameters are used for "required" tool choice parsing and are
        # tracked in StreamState for streaming parsing.
        tool_call_idx: int | None = None,
        tool_call_id_type: str = "random",
        function_name_returned: bool = False,
    ) -> tuple[DeltaMessage | None, bool]:
        if request.tool_choice and isinstance(
            request.tool_choice,
            (ToolChoiceFunction, ChatCompletionNamedToolChoiceParam),
        ):
            delta_message, function_name_returned = extract_named_tool_call_streaming(
                delta_text=delta_text,
                function_name=self._get_function_name(request),
                function_name_returned=function_name_returned,
                tool_call_idx=tool_call_idx,
                tool_call_id_type=tool_call_id_type,
                tokenizer=self.model_tokenizer,
            )
            return delta_message, function_name_returned

        if request.tool_choice == "required":
            delta_message, function_name_returned = (
                extract_required_tool_call_streaming(
                    previous_text=previous_text,
                    current_text=current_text,
                    delta_text=delta_text,
                    function_name_returned=function_name_returned,
                    tool_call_idx=tool_call_idx,
                    tool_call_id_type=tool_call_id_type,
                )
            )
            return delta_message, function_name_returned
        return self.extract_tool_calls_streaming(
            previous_text,
            current_text,
            delta_text,
            previous_token_ids,
            current_token_ids,
            delta_token_ids,
            request,  # type: ignore[arg-type]
        ), False

    def is_reasoning_end(self, input_ids: list[int]) -> bool:
        if self._reasoning_parser is None:
            return False
        return self._reasoning_parser.is_reasoning_end(input_ids)

    def extract_content_ids(self, input_ids: list[int]) -> list[int]:
        if self._reasoning_parser is None:
            return input_ids
        return self._reasoning_parser.extract_content_ids(input_ids)

    def _in_reasoning_phase(self, state: StreamState) -> bool:
        if self._reasoning_parser is None:
            return False
        if self._tool_parser is None:
            return True
        return not state.reasoning_ended

    def _in_tool_call_phase(self, state: StreamState) -> bool:
        if self._tool_parser is None:
            return False
        if self._reasoning_parser is None:
            return True
        return state.reasoning_ended

    def parse_delta(
        self,
        delta_text: str,
        delta_token_ids: list[int],
        request: ChatCompletionRequest | ResponsesRequest,
        prompt_token_ids: list[int] | None = None,
    ) -> DeltaMessage | None:
        state = self._stream_state

        if not state.prompt_reasoning_checked and prompt_token_ids is not None:
            state.prompt_reasoning_checked = True
            if self.is_reasoning_end(prompt_token_ids):
                state.reasoning_ended = True

        current_text = state.previous_text + delta_text
        current_token_ids = state.previous_token_ids + delta_token_ids
        delta_message: DeltaMessage | None = None

        # Reasoning extraction
        if self._in_reasoning_phase(state):
            delta_message = self.extract_reasoning_streaming(
                previous_text=state.previous_text,
                current_text=current_text,
                delta_text=delta_text,
                previous_token_ids=state.previous_token_ids,
                current_token_ids=current_token_ids,
                delta_token_ids=delta_token_ids,
            )
            # Hand off remaining content to tool parser
            if self._tool_parser and self.is_reasoning_end(delta_token_ids):
                state.reasoning_ended = True
                current_token_ids = self.extract_content_ids(delta_token_ids)
                if delta_message and delta_message.content:
                    current_text = delta_message.content
                    delta_message.content = None
                else:
                    current_text = ""

        # Tool call extraction
        if self._in_tool_call_phase(state):
            if not state.tool_call_text_started:
                state.tool_call_text_started = True
                state.previous_text = ""
                state.previous_token_ids = []
                delta_text = current_text
                delta_token_ids = current_token_ids

            delta_message, state.function_name_returned = (
                self._extract_tool_calls_streaming(
                    previous_text=state.previous_text,
                    current_text=current_text,
                    delta_text=delta_text,
                    previous_token_ids=state.previous_token_ids,
                    current_token_ids=current_token_ids,
                    delta_token_ids=delta_token_ids,
                    request=request,  # type: ignore[arg-type]
                    tool_call_idx=state.history_tool_call_cnt,
                    tool_call_id_type=state.tool_call_id_type,
                    function_name_returned=state.function_name_returned,
                )
            )

        # No parsers: pass through as content
        if self._reasoning_parser is None and self._tool_parser is None:
            delta_message = DeltaMessage(content=delta_text)

        state.previous_text = current_text
        state.previous_token_ids = current_token_ids
        return delta_message

_parse_tool_calls

_parse_tool_calls(
    request: ResponsesRequest,
    content: str | None,
    enable_auto_tools: bool,
) -> tuple[list[FunctionCall], str | None]

TODO(qandrew): merge _parse_tool_calls_from_content for ChatCompletions into this function Parse tool calls from content based on request tool_choice settings.

Returns:

Type Description
list[FunctionCall]

A tuple of (function_calls, remaining_content) if tool calls

str | None

were parsed

Source code in vllm/parser/abstract_parser.py
def _parse_tool_calls(
    self,
    request: ResponsesRequest,
    content: str | None,
    enable_auto_tools: bool,
) -> tuple[list[FunctionCall], str | None]:
    """
    TODO(qandrew): merge _parse_tool_calls_from_content
    for ChatCompletions into this function
    Parse tool calls from content based on request tool_choice settings.

    Returns:
        A tuple of (function_calls, remaining_content) if tool calls
        were parsed
    """
    function_calls: list[FunctionCall] = []

    if request.tool_choice and isinstance(
        request.tool_choice,
        (ToolChoiceFunction, ChatCompletionNamedToolChoiceParam),
    ):
        # Forced Function Call
        assert content is not None
        function_calls.append(
            FunctionCall(name=self._get_function_name(request), arguments=content)
        )
        return function_calls, None  # Clear content since tool is called.

    if request.tool_choice == "required":
        # Required tool calls - parse JSON
        tool_calls = []
        with contextlib.suppress(ValidationError):
            content = content or ""
            tool_calls = TypeAdapter(list[FunctionDefinition]).validate_json(
                content
            )
        for tool_call in tool_calls:
            function_calls.append(
                FunctionCall(
                    name=tool_call.name,
                    arguments=json.dumps(tool_call.parameters, ensure_ascii=False),
                )
            )
        return function_calls, None  # Clear content since tool is called.

    if (
        self._tool_parser is not None
        and enable_auto_tools
        and (request.tool_choice == "auto" or request.tool_choice is None)
    ):
        # Automatic Tool Call Parsing
        tool_call_info = self._tool_parser.extract_tool_calls(
            content if content is not None else "",
            request=request,  # type: ignore
        )
        if tool_call_info is not None and tool_call_info.tools_called:
            function_calls.extend(
                FunctionCall(
                    id=tool_call.id,
                    name=tool_call.function.name,
                    arguments=tool_call.function.arguments,
                )
                for tool_call in tool_call_info.tool_calls
            )
            remaining_content = tool_call_info.content
            if remaining_content and remaining_content.strip() == "":
                remaining_content = None
            return function_calls, remaining_content

    # No tool calls
    return [], content

Parser

Abstract Parser class that unifies ReasoningParser and ToolParser into a single interface for parsing model output.

This class provides a unified way to handle both reasoning extraction (e.g., chain-of-thought content in tags) and tool call extraction (e.g., function calls in XML/JSON format) from model outputs.

Subclasses can either: 1. Override the abstract methods directly for custom parsing logic 2. Set reasoning_parser and tool_parser properties to delegate to existing parser implementations

Class Attributes

reasoning_parser_cls: The ReasoningParser class to use (for compatibility with code that needs the class, not instance). tool_parser_cls: The ToolParser class to use (for compatibility with code that needs the class, not instance).

Source code in vllm/parser/abstract_parser.py
class Parser:
    """
    Abstract Parser class that unifies ReasoningParser and ToolParser into
    a single interface for parsing model output.

    This class provides a unified way to handle both reasoning extraction
    (e.g., chain-of-thought content in <think> tags) and tool call extraction
    (e.g., function calls in XML/JSON format) from model outputs.

    Subclasses can either:
    1. Override the abstract methods directly for custom parsing logic
    2. Set `reasoning_parser` and `tool_parser` properties to delegate to
       existing parser implementations

    Class Attributes:
        reasoning_parser_cls: The ReasoningParser class to use (for compatibility
            with code that needs the class, not instance).
        tool_parser_cls: The ToolParser class to use (for compatibility with
            code that needs the class, not instance).
    """

    # Class-level parser classes for compatibility with existing patterns
    # Subclasses should override these if they use specific parser classes
    reasoning_parser_cls: type[ReasoningParser] | None = None
    tool_parser_cls: type[ToolParser] | None = None

    def __init__(self, tokenizer: TokenizerLike, *args, **kwargs):
        """
        Initialize the Parser.

        Args:
            tokenizer: The tokenizer used by the model. This is required for
                token-based parsing operations.
        """
        self.model_tokenizer = tokenizer
        self._reasoning_parser: ReasoningParser | None = None
        self._tool_parser: ToolParser | None = None
        self._stream_state = StreamState()

    @cached_property
    def vocab(self) -> dict[str, int]:
        """Get the vocabulary mapping from tokens to IDs."""
        return self.model_tokenizer.get_vocab()

    @property
    def reasoning_parser(self) -> ReasoningParser | None:
        """The underlying reasoning parser, if any."""
        return self._reasoning_parser

    @reasoning_parser.setter
    def reasoning_parser(self, parser: ReasoningParser | None) -> None:
        self._reasoning_parser = parser

    @property
    def tool_parser(self) -> ToolParser | None:
        """The underlying tool parser, if any."""
        return self._tool_parser

    @tool_parser.setter
    def tool_parser(self, parser: ToolParser | None) -> None:
        self._tool_parser = parser

    # ========== Reasoning Parser Methods ==========

    @abstractmethod
    def is_reasoning_end(self, input_ids: list[int]) -> bool:
        """
        Check if the reasoning content ends in the input_ids.

        Used by structured engines like `xgrammar` to check if the
        reasoning content ends in the model output.

        Args:
            input_ids: The token IDs of the model output.

        Returns:
            True if the reasoning content ends in the input_ids.
        """

    def is_reasoning_end_streaming(
        self, input_ids: list[int], delta_ids: list[int]
    ) -> bool:
        """
        Check if the reasoning content ends during a decode step.

        Args:
            input_ids: The entire model output token IDs.
            delta_ids: The last few computed tokens at the current decode step.

        Returns:
            True if the reasoning content ends in the delta_ids.
        """
        return self.is_reasoning_end(input_ids)

    @abstractmethod
    def extract_content_ids(self, input_ids: list[int]) -> list[int]:
        """
        Extract content token IDs from the input_ids.

        This extracts the non-reasoning content (e.g., everything after
        the </think> tag).

        Args:
            input_ids: The token IDs of the model output.

        Returns:
            The extracted content token IDs.
        """

    @abstractmethod
    def extract_response_outputs(
        self,
        *,
        model_output: str,
        model_output_token_ids: Sequence[int],
        request: ResponsesRequest,
        enable_auto_tools: bool = False,
        tool_call_id_type: str = "random",
        logprobs: list[Logprob] | None = None,
    ) -> list[ResponseOutputItem]:
        """
        Extract reasoning, content, and tool calls from a complete
        model-generated string and return as ResponseOutputItem objects.

        Used for non-streaming responses where we have the entire model
        response available before sending to the client.

        Args:
            model_output: The complete model-generated string.
            model_output_token_ids: The token IDs of the model output.
            request: The request object used to generate the output.
            enable_auto_tools: Whether to enable automatic tool call parsing.
            tool_call_id_type: Type of tool call ID generation ("random", etc).
            logprobs: Pre-computed logprobs for the output text, if any.

        Returns:
            A list of ResponseOutputItem objects.
        """

    @abstractmethod
    def extract_reasoning(
        self,
        model_output: str,
        request: ChatCompletionRequest | ResponsesRequest,
    ) -> tuple[str | None, str | None]:
        """
        Extract reasoning content from a complete model-generated string.

        Used for non-streaming responses where we have the entire model
        response available before sending to the client.

        Args:
            model_output: The complete model-generated string.
            request: The request object used to generate the output.

        Returns:
            A tuple of (reasoning, response_content).
        """

    @abstractmethod
    def extract_reasoning_streaming(
        self,
        previous_text: str,
        current_text: str,
        delta_text: str,
        previous_token_ids: Sequence[int],
        current_token_ids: Sequence[int],
        delta_token_ids: Sequence[int],
    ) -> DeltaMessage | None:
        """
        Extract reasoning content from a streaming delta message.

        Args:
            previous_text: Text from all previous tokens.
            current_text: Text including the current delta.
            delta_text: The new text in this delta.
            previous_token_ids: Token IDs from previous generation.
            current_token_ids: All token IDs including current.
            delta_token_ids: The new token IDs in this delta.

        Returns:
            A DeltaMessage with reasoning and/or content fields, or None.
        """

    # ========== Tool Parser Methods ==========

    def adjust_request(
        self, request: ChatCompletionRequest | ResponsesRequest
    ) -> ChatCompletionRequest | ResponsesRequest:
        """
        Adjust the request parameters for tool calling.

        Can be overridden by subclasses to modify request parameters
        (e.g., setting structured output schemas for tool calling).

        Args:
            request: The original request.

        Returns:
            The adjusted request.
        """
        return request

    @abstractmethod
    def extract_tool_calls(
        self,
        model_output: str,
        request: ChatCompletionRequest,
    ) -> ExtractedToolCallInformation:
        """
        Extract tool calls from a complete model-generated string.

        Used for non-streaming responses.

        Args:
            model_output: The complete model-generated string.
            request: The request object used to generate the output.

        Returns:
            ExtractedToolCallInformation containing the tool calls.
        """

    @abstractmethod
    def extract_tool_calls_streaming(
        self,
        previous_text: str,
        current_text: str,
        delta_text: str,
        previous_token_ids: Sequence[int],
        current_token_ids: Sequence[int],
        delta_token_ids: Sequence[int],
        request: ChatCompletionRequest,
    ) -> DeltaMessage | None:
        """
        Extract tool calls from a streaming delta message.

        Args:
            previous_text: Text from all previous tokens.
            current_text: Text including the current delta.
            delta_text: The new text in this delta.
            previous_token_ids: Token IDs from previous generation.
            current_token_ids: All token IDs including current.
            delta_token_ids: The new token IDs in this delta.
            request: The request object.

        Returns:
            A DeltaMessage with tool_calls field, or None.
        """

    @abstractmethod
    def parse_delta(
        self,
        delta_text: str,
        delta_token_ids: list[int],
        request: ChatCompletionRequest | ResponsesRequest,
        prompt_token_ids: list[int] | None = None,
    ) -> DeltaMessage | None:
        """Parse a single streaming delta, orchestrating reasoning then
        tool call extraction via internal stream state.
        """

reasoning_parser property writable

reasoning_parser: ReasoningParser | None

The underlying reasoning parser, if any.

tool_parser property writable

tool_parser: ToolParser | None

The underlying tool parser, if any.

vocab cached property

vocab: dict[str, int]

Get the vocabulary mapping from tokens to IDs.

__init__

__init__(tokenizer: TokenizerLike, *args, **kwargs)

Initialize the Parser.

Parameters:

Name Type Description Default
tokenizer TokenizerLike

The tokenizer used by the model. This is required for token-based parsing operations.

required
Source code in vllm/parser/abstract_parser.py
def __init__(self, tokenizer: TokenizerLike, *args, **kwargs):
    """
    Initialize the Parser.

    Args:
        tokenizer: The tokenizer used by the model. This is required for
            token-based parsing operations.
    """
    self.model_tokenizer = tokenizer
    self._reasoning_parser: ReasoningParser | None = None
    self._tool_parser: ToolParser | None = None
    self._stream_state = StreamState()

adjust_request

Adjust the request parameters for tool calling.

Can be overridden by subclasses to modify request parameters (e.g., setting structured output schemas for tool calling).

Parameters:

Name Type Description Default
request ChatCompletionRequest | ResponsesRequest

The original request.

required

Returns:

Type Description
ChatCompletionRequest | ResponsesRequest

The adjusted request.

Source code in vllm/parser/abstract_parser.py
def adjust_request(
    self, request: ChatCompletionRequest | ResponsesRequest
) -> ChatCompletionRequest | ResponsesRequest:
    """
    Adjust the request parameters for tool calling.

    Can be overridden by subclasses to modify request parameters
    (e.g., setting structured output schemas for tool calling).

    Args:
        request: The original request.

    Returns:
        The adjusted request.
    """
    return request

extract_content_ids abstractmethod

extract_content_ids(input_ids: list[int]) -> list[int]

Extract content token IDs from the input_ids.

This extracts the non-reasoning content (e.g., everything after the tag).

Parameters:

Name Type Description Default
input_ids list[int]

The token IDs of the model output.

required

Returns:

Type Description
list[int]

The extracted content token IDs.

Source code in vllm/parser/abstract_parser.py
@abstractmethod
def extract_content_ids(self, input_ids: list[int]) -> list[int]:
    """
    Extract content token IDs from the input_ids.

    This extracts the non-reasoning content (e.g., everything after
    the </think> tag).

    Args:
        input_ids: The token IDs of the model output.

    Returns:
        The extracted content token IDs.
    """

extract_reasoning abstractmethod

extract_reasoning(
    model_output: str,
    request: ChatCompletionRequest | ResponsesRequest,
) -> tuple[str | None, str | None]

Extract reasoning content from a complete model-generated string.

Used for non-streaming responses where we have the entire model response available before sending to the client.

Parameters:

Name Type Description Default
model_output str

The complete model-generated string.

required
request ChatCompletionRequest | ResponsesRequest

The request object used to generate the output.

required

Returns:

Type Description
tuple[str | None, str | None]

A tuple of (reasoning, response_content).

Source code in vllm/parser/abstract_parser.py
@abstractmethod
def extract_reasoning(
    self,
    model_output: str,
    request: ChatCompletionRequest | ResponsesRequest,
) -> tuple[str | None, str | None]:
    """
    Extract reasoning content from a complete model-generated string.

    Used for non-streaming responses where we have the entire model
    response available before sending to the client.

    Args:
        model_output: The complete model-generated string.
        request: The request object used to generate the output.

    Returns:
        A tuple of (reasoning, response_content).
    """

extract_reasoning_streaming abstractmethod

extract_reasoning_streaming(
    previous_text: str,
    current_text: str,
    delta_text: str,
    previous_token_ids: Sequence[int],
    current_token_ids: Sequence[int],
    delta_token_ids: Sequence[int],
) -> DeltaMessage | None

Extract reasoning content from a streaming delta message.

Parameters:

Name Type Description Default
previous_text str

Text from all previous tokens.

required
current_text str

Text including the current delta.

required
delta_text str

The new text in this delta.

required
previous_token_ids Sequence[int]

Token IDs from previous generation.

required
current_token_ids Sequence[int]

All token IDs including current.

required
delta_token_ids Sequence[int]

The new token IDs in this delta.

required

Returns:

Type Description
DeltaMessage | None

A DeltaMessage with reasoning and/or content fields, or None.

Source code in vllm/parser/abstract_parser.py
@abstractmethod
def extract_reasoning_streaming(
    self,
    previous_text: str,
    current_text: str,
    delta_text: str,
    previous_token_ids: Sequence[int],
    current_token_ids: Sequence[int],
    delta_token_ids: Sequence[int],
) -> DeltaMessage | None:
    """
    Extract reasoning content from a streaming delta message.

    Args:
        previous_text: Text from all previous tokens.
        current_text: Text including the current delta.
        delta_text: The new text in this delta.
        previous_token_ids: Token IDs from previous generation.
        current_token_ids: All token IDs including current.
        delta_token_ids: The new token IDs in this delta.

    Returns:
        A DeltaMessage with reasoning and/or content fields, or None.
    """

extract_response_outputs abstractmethod

extract_response_outputs(
    *,
    model_output: str,
    model_output_token_ids: Sequence[int],
    request: ResponsesRequest,
    enable_auto_tools: bool = False,
    tool_call_id_type: str = "random",
    logprobs: list[Logprob] | None = None,
) -> list[ResponseOutputItem]

Extract reasoning, content, and tool calls from a complete model-generated string and return as ResponseOutputItem objects.

Used for non-streaming responses where we have the entire model response available before sending to the client.

Parameters:

Name Type Description Default
model_output str

The complete model-generated string.

required
model_output_token_ids Sequence[int]

The token IDs of the model output.

required
request ResponsesRequest

The request object used to generate the output.

required
enable_auto_tools bool

Whether to enable automatic tool call parsing.

False
tool_call_id_type str

Type of tool call ID generation ("random", etc).

'random'
logprobs list[Logprob] | None

Pre-computed logprobs for the output text, if any.

None

Returns:

Type Description
list[ResponseOutputItem]

A list of ResponseOutputItem objects.

Source code in vllm/parser/abstract_parser.py
@abstractmethod
def extract_response_outputs(
    self,
    *,
    model_output: str,
    model_output_token_ids: Sequence[int],
    request: ResponsesRequest,
    enable_auto_tools: bool = False,
    tool_call_id_type: str = "random",
    logprobs: list[Logprob] | None = None,
) -> list[ResponseOutputItem]:
    """
    Extract reasoning, content, and tool calls from a complete
    model-generated string and return as ResponseOutputItem objects.

    Used for non-streaming responses where we have the entire model
    response available before sending to the client.

    Args:
        model_output: The complete model-generated string.
        model_output_token_ids: The token IDs of the model output.
        request: The request object used to generate the output.
        enable_auto_tools: Whether to enable automatic tool call parsing.
        tool_call_id_type: Type of tool call ID generation ("random", etc).
        logprobs: Pre-computed logprobs for the output text, if any.

    Returns:
        A list of ResponseOutputItem objects.
    """

extract_tool_calls abstractmethod

extract_tool_calls(
    model_output: str, request: ChatCompletionRequest
) -> ExtractedToolCallInformation

Extract tool calls from a complete model-generated string.

Used for non-streaming responses.

Parameters:

Name Type Description Default
model_output str

The complete model-generated string.

required
request ChatCompletionRequest

The request object used to generate the output.

required

Returns:

Type Description
ExtractedToolCallInformation

ExtractedToolCallInformation containing the tool calls.

Source code in vllm/parser/abstract_parser.py
@abstractmethod
def extract_tool_calls(
    self,
    model_output: str,
    request: ChatCompletionRequest,
) -> ExtractedToolCallInformation:
    """
    Extract tool calls from a complete model-generated string.

    Used for non-streaming responses.

    Args:
        model_output: The complete model-generated string.
        request: The request object used to generate the output.

    Returns:
        ExtractedToolCallInformation containing the tool calls.
    """

extract_tool_calls_streaming abstractmethod

extract_tool_calls_streaming(
    previous_text: str,
    current_text: str,
    delta_text: str,
    previous_token_ids: Sequence[int],
    current_token_ids: Sequence[int],
    delta_token_ids: Sequence[int],
    request: ChatCompletionRequest,
) -> DeltaMessage | None

Extract tool calls from a streaming delta message.

Parameters:

Name Type Description Default
previous_text str

Text from all previous tokens.

required
current_text str

Text including the current delta.

required
delta_text str

The new text in this delta.

required
previous_token_ids Sequence[int]

Token IDs from previous generation.

required
current_token_ids Sequence[int]

All token IDs including current.

required
delta_token_ids Sequence[int]

The new token IDs in this delta.

required
request ChatCompletionRequest

The request object.

required

Returns:

Type Description
DeltaMessage | None

A DeltaMessage with tool_calls field, or None.

Source code in vllm/parser/abstract_parser.py
@abstractmethod
def extract_tool_calls_streaming(
    self,
    previous_text: str,
    current_text: str,
    delta_text: str,
    previous_token_ids: Sequence[int],
    current_token_ids: Sequence[int],
    delta_token_ids: Sequence[int],
    request: ChatCompletionRequest,
) -> DeltaMessage | None:
    """
    Extract tool calls from a streaming delta message.

    Args:
        previous_text: Text from all previous tokens.
        current_text: Text including the current delta.
        delta_text: The new text in this delta.
        previous_token_ids: Token IDs from previous generation.
        current_token_ids: All token IDs including current.
        delta_token_ids: The new token IDs in this delta.
        request: The request object.

    Returns:
        A DeltaMessage with tool_calls field, or None.
    """

is_reasoning_end abstractmethod

is_reasoning_end(input_ids: list[int]) -> bool

Check if the reasoning content ends in the input_ids.

Used by structured engines like xgrammar to check if the reasoning content ends in the model output.

Parameters:

Name Type Description Default
input_ids list[int]

The token IDs of the model output.

required

Returns:

Type Description
bool

True if the reasoning content ends in the input_ids.

Source code in vllm/parser/abstract_parser.py
@abstractmethod
def is_reasoning_end(self, input_ids: list[int]) -> bool:
    """
    Check if the reasoning content ends in the input_ids.

    Used by structured engines like `xgrammar` to check if the
    reasoning content ends in the model output.

    Args:
        input_ids: The token IDs of the model output.

    Returns:
        True if the reasoning content ends in the input_ids.
    """

is_reasoning_end_streaming

is_reasoning_end_streaming(
    input_ids: list[int], delta_ids: list[int]
) -> bool

Check if the reasoning content ends during a decode step.

Parameters:

Name Type Description Default
input_ids list[int]

The entire model output token IDs.

required
delta_ids list[int]

The last few computed tokens at the current decode step.

required

Returns:

Type Description
bool

True if the reasoning content ends in the delta_ids.

Source code in vllm/parser/abstract_parser.py
def is_reasoning_end_streaming(
    self, input_ids: list[int], delta_ids: list[int]
) -> bool:
    """
    Check if the reasoning content ends during a decode step.

    Args:
        input_ids: The entire model output token IDs.
        delta_ids: The last few computed tokens at the current decode step.

    Returns:
        True if the reasoning content ends in the delta_ids.
    """
    return self.is_reasoning_end(input_ids)

parse_delta abstractmethod

parse_delta(
    delta_text: str,
    delta_token_ids: list[int],
    request: ChatCompletionRequest | ResponsesRequest,
    prompt_token_ids: list[int] | None = None,
) -> DeltaMessage | None

Parse a single streaming delta, orchestrating reasoning then tool call extraction via internal stream state.

Source code in vllm/parser/abstract_parser.py
@abstractmethod
def parse_delta(
    self,
    delta_text: str,
    delta_token_ids: list[int],
    request: ChatCompletionRequest | ResponsesRequest,
    prompt_token_ids: list[int] | None = None,
) -> DeltaMessage | None:
    """Parse a single streaming delta, orchestrating reasoning then
    tool call extraction via internal stream state.
    """

ParserManager

Central registry for Parser implementations.

Supports two registration modes
  • Eager registration via register_module
  • Lazy registration via register_lazy_module
Source code in vllm/parser/parser_manager.py
class ParserManager:
    """
    Central registry for Parser implementations.

    Supports two registration modes:
      - Eager registration via `register_module`
      - Lazy registration via `register_lazy_module`
    """

    parsers: dict[str, type[Parser]] = {}
    lazy_parsers: dict[str, tuple[str, str]] = {}  # name -> (module_path, class_name)

    @classmethod
    def get_parser_internal(cls, name: str) -> type[Parser]:
        """
        Retrieve a registered or lazily registered Parser class.

        Args:
            name: The registered name of the parser.

        Returns:
            The Parser class.

        Raises:
            KeyError: If no parser is found under the given name.
        """
        if name in cls.parsers:
            return cls.parsers[name]

        if name in cls.lazy_parsers:
            return cls._load_lazy_parser(name)

        registered = ", ".join(cls.list_registered())
        raise KeyError(f"Parser '{name}' not found. Available parsers: {registered}")

    @classmethod
    def _load_lazy_parser(cls, name: str) -> type[Parser]:
        """Import and register a lazily loaded parser."""
        from vllm.parser.abstract_parser import Parser

        module_path, class_name = cls.lazy_parsers[name]
        try:
            mod = importlib.import_module(module_path)
            parser_cls = getattr(mod, class_name)
            if not issubclass(parser_cls, Parser):
                raise TypeError(
                    f"{class_name} in {module_path} is not a Parser subclass."
                )
            cls.parsers[name] = parser_cls  # cache
            return parser_cls
        except Exception as e:
            logger.exception(
                "Failed to import lazy parser '%s' from %s: %s",
                name,
                module_path,
                e,
            )
            raise

    @classmethod
    def _register_module(
        cls,
        module: type[Parser],
        module_name: str | list[str] | None = None,
        force: bool = True,
    ) -> None:
        """Register a Parser class immediately."""
        from vllm.parser.abstract_parser import Parser

        if not issubclass(module, Parser):
            raise TypeError(
                f"module must be subclass of Parser, but got {type(module)}"
            )

        if module_name is None:
            module_names = [module.__name__]
        elif isinstance(module_name, str):
            module_names = [module_name]
        elif is_list_of(module_name, str):
            module_names = module_name
        else:
            raise TypeError("module_name must be str, list[str], or None.")

        for name in module_names:
            if not force and name in cls.parsers:
                existed = cls.parsers[name]
                raise KeyError(f"{name} is already registered at {existed.__module__}")
            cls.parsers[name] = module

    @classmethod
    def register_lazy_module(cls, name: str, module_path: str, class_name: str) -> None:
        """
        Register a lazy module mapping for delayed import.

        Example:
            ParserManager.register_lazy_module(
                name="minimax_m2",
                module_path="vllm.parser.minimax_m2_parser",
                class_name="MiniMaxM2Parser",
            )
        """
        cls.lazy_parsers[name] = (module_path, class_name)

    @classmethod
    def register_module(
        cls,
        name: str | list[str] | None = None,
        force: bool = True,
        module: type[Parser] | None = None,
    ) -> type[Parser] | Callable[[type[Parser]], type[Parser]]:
        """
        Register a Parser class.

        Can be used as a decorator or called directly.

        Usage:
            @ParserManager.register_module("my_parser")
            class MyParser(Parser):
                ...

        Or:
            ParserManager.register_module(module=MyParser)
        """
        if not isinstance(force, bool):
            raise TypeError(f"force must be a boolean, but got {type(force)}")

        # Immediate registration
        if module is not None:
            cls._register_module(module=module, module_name=name, force=force)
            return module

        # Decorator usage
        def _decorator(obj: type[Parser]) -> type[Parser]:
            module_path = obj.__module__
            class_name = obj.__name__

            if isinstance(name, str):
                names = [name]
            elif is_list_of(name, str):
                names = name
            else:
                names = [class_name]

            for n in names:
                cls.lazy_parsers[n] = (module_path, class_name)

            return obj

        return _decorator

    @classmethod
    def list_registered(cls) -> list[str]:
        """Return names of all registered parsers."""
        return sorted(set(cls.parsers.keys()) | set(cls.lazy_parsers.keys()))

    @classmethod
    def import_parser(cls, plugin_path: str) -> None:
        """Import a user-defined parser from an arbitrary path."""
        module_name = os.path.splitext(os.path.basename(plugin_path))[0]
        try:
            import_from_path(module_name, plugin_path)
        except Exception:
            logger.exception(
                "Failed to load module '%s' from %s.", module_name, plugin_path
            )

    @classmethod
    def get_tool_parser(
        cls,
        tool_parser_name: str | None = None,
        enable_auto_tools: bool = False,
        model_name: str | None = None,
    ) -> type[ToolParser] | None:
        """Get the tool parser based on the name."""
        from vllm.tool_parsers import ToolParserManager

        parser: type[ToolParser] | None = None
        if not enable_auto_tools or tool_parser_name is None:
            return parser
        logger.info_once('"auto" tool choice has been enabled.')

        try:
            if (
                tool_parser_name == "pythonic"
                and model_name
                and model_name.startswith("meta-llama/Llama-3.2")
            ):
                logger.warning(
                    "Llama3.2 models may struggle to emit valid pythonic tool calls"
                )
            parser = ToolParserManager.get_tool_parser(tool_parser_name)
        except Exception as e:
            raise TypeError(
                "Error: --enable-auto-tool-choice requires "
                f"tool_parser:'{tool_parser_name}' which has not "
                "been registered"
            ) from e
        return parser

    @classmethod
    def get_reasoning_parser(
        cls,
        reasoning_parser_name: str | None,
    ) -> type[ReasoningParser] | None:
        """Get the reasoning parser based on the name."""
        from vllm.reasoning import ReasoningParserManager

        parser: type[ReasoningParser] | None = None
        if not reasoning_parser_name:
            return None
        try:
            parser = ReasoningParserManager.get_reasoning_parser(reasoning_parser_name)
            assert parser is not None
        except Exception as e:
            raise TypeError(f"{reasoning_parser_name=} has not been registered") from e
        return parser

    @classmethod
    def get_parser(
        cls,
        tool_parser_name: str | None = None,
        reasoning_parser_name: str | None = None,
        enable_auto_tools: bool = False,
        model_name: str | None = None,
    ) -> type[Parser] | None:
        """
        Get a unified Parser that handles both reasoning and tool parsing.

        This method checks if a unified Parser exists that can handle both
        reasoning extraction and tool call parsing. If no unified parser
        exists, it creates a DelegatingParser that wraps the individual
        reasoning and tool parsers.

        Args:
            tool_parser_name: The name of the tool parser.
            reasoning_parser_name: The name of the reasoning parser.
            enable_auto_tools: Whether auto tool choice is enabled.
            model_name: The model name for parser-specific warnings.

        Returns:
            A Parser class, or None if neither parser is specified.
        """
        from vllm.parser.abstract_parser import _WrappedParser

        if not tool_parser_name and not reasoning_parser_name:
            return None

        # Strategy 1: If both names match, check for a unified parser with that name
        if tool_parser_name and tool_parser_name == reasoning_parser_name:
            try:
                parser = cls.get_parser_internal(tool_parser_name)
                logger.info(
                    "Using unified parser '%s' for both reasoning and tool parsing.",
                    tool_parser_name,
                )
                return parser
            except KeyError:
                pass  # No unified parser with this name

        # Strategy 2: Check for parser with either name
        for name in [tool_parser_name, reasoning_parser_name]:
            if name:
                try:
                    parser = cls.get_parser_internal(name)
                    logger.info(
                        "Using unified parser '%s' for reasoning and tool parsing.",
                        name,
                    )
                    return parser
                except KeyError:
                    pass

        # Strategy 3: Create a DelegatingParser with the individual parser classes
        reasoning_parser_cls = cls.get_reasoning_parser(reasoning_parser_name)
        tool_parser_cls = cls.get_tool_parser(
            tool_parser_name, enable_auto_tools, model_name
        )

        if reasoning_parser_cls is None and tool_parser_cls is None:
            return None

        # Set the class-level attributes on the imported _WrappedParser
        _WrappedParser.reasoning_parser_cls = reasoning_parser_cls
        _WrappedParser.tool_parser_cls = tool_parser_cls

        return _WrappedParser

_load_lazy_parser classmethod

_load_lazy_parser(name: str) -> type[Parser]

Import and register a lazily loaded parser.

Source code in vllm/parser/parser_manager.py
@classmethod
def _load_lazy_parser(cls, name: str) -> type[Parser]:
    """Import and register a lazily loaded parser."""
    from vllm.parser.abstract_parser import Parser

    module_path, class_name = cls.lazy_parsers[name]
    try:
        mod = importlib.import_module(module_path)
        parser_cls = getattr(mod, class_name)
        if not issubclass(parser_cls, Parser):
            raise TypeError(
                f"{class_name} in {module_path} is not a Parser subclass."
            )
        cls.parsers[name] = parser_cls  # cache
        return parser_cls
    except Exception as e:
        logger.exception(
            "Failed to import lazy parser '%s' from %s: %s",
            name,
            module_path,
            e,
        )
        raise

_register_module classmethod

_register_module(
    module: type[Parser],
    module_name: str | list[str] | None = None,
    force: bool = True,
) -> None

Register a Parser class immediately.

Source code in vllm/parser/parser_manager.py
@classmethod
def _register_module(
    cls,
    module: type[Parser],
    module_name: str | list[str] | None = None,
    force: bool = True,
) -> None:
    """Register a Parser class immediately."""
    from vllm.parser.abstract_parser import Parser

    if not issubclass(module, Parser):
        raise TypeError(
            f"module must be subclass of Parser, but got {type(module)}"
        )

    if module_name is None:
        module_names = [module.__name__]
    elif isinstance(module_name, str):
        module_names = [module_name]
    elif is_list_of(module_name, str):
        module_names = module_name
    else:
        raise TypeError("module_name must be str, list[str], or None.")

    for name in module_names:
        if not force and name in cls.parsers:
            existed = cls.parsers[name]
            raise KeyError(f"{name} is already registered at {existed.__module__}")
        cls.parsers[name] = module

get_parser classmethod

get_parser(
    tool_parser_name: str | None = None,
    reasoning_parser_name: str | None = None,
    enable_auto_tools: bool = False,
    model_name: str | None = None,
) -> type[Parser] | None

Get a unified Parser that handles both reasoning and tool parsing.

This method checks if a unified Parser exists that can handle both reasoning extraction and tool call parsing. If no unified parser exists, it creates a DelegatingParser that wraps the individual reasoning and tool parsers.

Parameters:

Name Type Description Default
tool_parser_name str | None

The name of the tool parser.

None
reasoning_parser_name str | None

The name of the reasoning parser.

None
enable_auto_tools bool

Whether auto tool choice is enabled.

False
model_name str | None

The model name for parser-specific warnings.

None

Returns:

Type Description
type[Parser] | None

A Parser class, or None if neither parser is specified.

Source code in vllm/parser/parser_manager.py
@classmethod
def get_parser(
    cls,
    tool_parser_name: str | None = None,
    reasoning_parser_name: str | None = None,
    enable_auto_tools: bool = False,
    model_name: str | None = None,
) -> type[Parser] | None:
    """
    Get a unified Parser that handles both reasoning and tool parsing.

    This method checks if a unified Parser exists that can handle both
    reasoning extraction and tool call parsing. If no unified parser
    exists, it creates a DelegatingParser that wraps the individual
    reasoning and tool parsers.

    Args:
        tool_parser_name: The name of the tool parser.
        reasoning_parser_name: The name of the reasoning parser.
        enable_auto_tools: Whether auto tool choice is enabled.
        model_name: The model name for parser-specific warnings.

    Returns:
        A Parser class, or None if neither parser is specified.
    """
    from vllm.parser.abstract_parser import _WrappedParser

    if not tool_parser_name and not reasoning_parser_name:
        return None

    # Strategy 1: If both names match, check for a unified parser with that name
    if tool_parser_name and tool_parser_name == reasoning_parser_name:
        try:
            parser = cls.get_parser_internal(tool_parser_name)
            logger.info(
                "Using unified parser '%s' for both reasoning and tool parsing.",
                tool_parser_name,
            )
            return parser
        except KeyError:
            pass  # No unified parser with this name

    # Strategy 2: Check for parser with either name
    for name in [tool_parser_name, reasoning_parser_name]:
        if name:
            try:
                parser = cls.get_parser_internal(name)
                logger.info(
                    "Using unified parser '%s' for reasoning and tool parsing.",
                    name,
                )
                return parser
            except KeyError:
                pass

    # Strategy 3: Create a DelegatingParser with the individual parser classes
    reasoning_parser_cls = cls.get_reasoning_parser(reasoning_parser_name)
    tool_parser_cls = cls.get_tool_parser(
        tool_parser_name, enable_auto_tools, model_name
    )

    if reasoning_parser_cls is None and tool_parser_cls is None:
        return None

    # Set the class-level attributes on the imported _WrappedParser
    _WrappedParser.reasoning_parser_cls = reasoning_parser_cls
    _WrappedParser.tool_parser_cls = tool_parser_cls

    return _WrappedParser

get_parser_internal classmethod

get_parser_internal(name: str) -> type[Parser]

Retrieve a registered or lazily registered Parser class.

Parameters:

Name Type Description Default
name str

The registered name of the parser.

required

Returns:

Type Description
type[Parser]

The Parser class.

Raises:

Type Description
KeyError

If no parser is found under the given name.

Source code in vllm/parser/parser_manager.py
@classmethod
def get_parser_internal(cls, name: str) -> type[Parser]:
    """
    Retrieve a registered or lazily registered Parser class.

    Args:
        name: The registered name of the parser.

    Returns:
        The Parser class.

    Raises:
        KeyError: If no parser is found under the given name.
    """
    if name in cls.parsers:
        return cls.parsers[name]

    if name in cls.lazy_parsers:
        return cls._load_lazy_parser(name)

    registered = ", ".join(cls.list_registered())
    raise KeyError(f"Parser '{name}' not found. Available parsers: {registered}")

get_reasoning_parser classmethod

get_reasoning_parser(
    reasoning_parser_name: str | None,
) -> type[ReasoningParser] | None

Get the reasoning parser based on the name.

Source code in vllm/parser/parser_manager.py
@classmethod
def get_reasoning_parser(
    cls,
    reasoning_parser_name: str | None,
) -> type[ReasoningParser] | None:
    """Get the reasoning parser based on the name."""
    from vllm.reasoning import ReasoningParserManager

    parser: type[ReasoningParser] | None = None
    if not reasoning_parser_name:
        return None
    try:
        parser = ReasoningParserManager.get_reasoning_parser(reasoning_parser_name)
        assert parser is not None
    except Exception as e:
        raise TypeError(f"{reasoning_parser_name=} has not been registered") from e
    return parser

get_tool_parser classmethod

get_tool_parser(
    tool_parser_name: str | None = None,
    enable_auto_tools: bool = False,
    model_name: str | None = None,
) -> type[ToolParser] | None

Get the tool parser based on the name.

Source code in vllm/parser/parser_manager.py
@classmethod
def get_tool_parser(
    cls,
    tool_parser_name: str | None = None,
    enable_auto_tools: bool = False,
    model_name: str | None = None,
) -> type[ToolParser] | None:
    """Get the tool parser based on the name."""
    from vllm.tool_parsers import ToolParserManager

    parser: type[ToolParser] | None = None
    if not enable_auto_tools or tool_parser_name is None:
        return parser
    logger.info_once('"auto" tool choice has been enabled.')

    try:
        if (
            tool_parser_name == "pythonic"
            and model_name
            and model_name.startswith("meta-llama/Llama-3.2")
        ):
            logger.warning(
                "Llama3.2 models may struggle to emit valid pythonic tool calls"
            )
        parser = ToolParserManager.get_tool_parser(tool_parser_name)
    except Exception as e:
        raise TypeError(
            "Error: --enable-auto-tool-choice requires "
            f"tool_parser:'{tool_parser_name}' which has not "
            "been registered"
        ) from e
    return parser

import_parser classmethod

import_parser(plugin_path: str) -> None

Import a user-defined parser from an arbitrary path.

Source code in vllm/parser/parser_manager.py
@classmethod
def import_parser(cls, plugin_path: str) -> None:
    """Import a user-defined parser from an arbitrary path."""
    module_name = os.path.splitext(os.path.basename(plugin_path))[0]
    try:
        import_from_path(module_name, plugin_path)
    except Exception:
        logger.exception(
            "Failed to load module '%s' from %s.", module_name, plugin_path
        )

list_registered classmethod

list_registered() -> list[str]

Return names of all registered parsers.

Source code in vllm/parser/parser_manager.py
@classmethod
def list_registered(cls) -> list[str]:
    """Return names of all registered parsers."""
    return sorted(set(cls.parsers.keys()) | set(cls.lazy_parsers.keys()))

register_lazy_module classmethod

register_lazy_module(
    name: str, module_path: str, class_name: str
) -> None

Register a lazy module mapping for delayed import.

Example

ParserManager.register_lazy_module( name="minimax_m2", module_path="vllm.parser.minimax_m2_parser", class_name="MiniMaxM2Parser", )

Source code in vllm/parser/parser_manager.py
@classmethod
def register_lazy_module(cls, name: str, module_path: str, class_name: str) -> None:
    """
    Register a lazy module mapping for delayed import.

    Example:
        ParserManager.register_lazy_module(
            name="minimax_m2",
            module_path="vllm.parser.minimax_m2_parser",
            class_name="MiniMaxM2Parser",
        )
    """
    cls.lazy_parsers[name] = (module_path, class_name)

register_module classmethod

register_module(
    name: str | list[str] | None = None,
    force: bool = True,
    module: type[Parser] | None = None,
) -> type[Parser] | Callable[[type[Parser]], type[Parser]]

Register a Parser class.

Can be used as a decorator or called directly.

Usage

@ParserManager.register_module("my_parser") class MyParser(Parser): ...

Or

ParserManager.register_module(module=MyParser)

Source code in vllm/parser/parser_manager.py
@classmethod
def register_module(
    cls,
    name: str | list[str] | None = None,
    force: bool = True,
    module: type[Parser] | None = None,
) -> type[Parser] | Callable[[type[Parser]], type[Parser]]:
    """
    Register a Parser class.

    Can be used as a decorator or called directly.

    Usage:
        @ParserManager.register_module("my_parser")
        class MyParser(Parser):
            ...

    Or:
        ParserManager.register_module(module=MyParser)
    """
    if not isinstance(force, bool):
        raise TypeError(f"force must be a boolean, but got {type(force)}")

    # Immediate registration
    if module is not None:
        cls._register_module(module=module, module_name=name, force=force)
        return module

    # Decorator usage
    def _decorator(obj: type[Parser]) -> type[Parser]:
        module_path = obj.__module__
        class_name = obj.__name__

        if isinstance(name, str):
            names = [name]
        elif is_list_of(name, str):
            names = name
        else:
            names = [class_name]

        for n in names:
            cls.lazy_parsers[n] = (module_path, class_name)

        return obj

    return _decorator

_WrappedParser

Bases: DelegatingParser

A DelegatingParser subclass that instantiates parsers from class attributes.

This class is used to dynamically create a parser that wraps individual ReasoningParser and ToolParser classes. The class attributes reasoning_parser_cls and tool_parser_cls should be set before instantiation.

Usage

_WrappedParser.reasoning_parser_cls = MyReasoningParser _WrappedParser.tool_parser_cls = MyToolParser parser = _WrappedParser(tokenizer)

Source code in vllm/parser/abstract_parser.py
class _WrappedParser(DelegatingParser):
    """
    A DelegatingParser subclass that instantiates parsers from class attributes.

    This class is used to dynamically create a parser that wraps individual
    ReasoningParser and ToolParser classes. The class attributes
    `reasoning_parser_cls` and `tool_parser_cls` should be set before
    instantiation.

    Usage:
        _WrappedParser.reasoning_parser_cls = MyReasoningParser
        _WrappedParser.tool_parser_cls = MyToolParser
        parser = _WrappedParser(tokenizer)
    """

    reasoning_parser_cls: type[ReasoningParser] | None = None
    tool_parser_cls: type[ToolParser] | None = None

    def __init__(
        self, tokenizer: TokenizerLike, tools: list[Tool] | None = None, **kwargs
    ):
        super().__init__(tokenizer)
        # Instantiate the underlying parsers from class attributes
        if self.__class__.reasoning_parser_cls is not None:
            self._reasoning_parser = self.__class__.reasoning_parser_cls(
                tokenizer, **kwargs
            )
        if self.__class__.tool_parser_cls is not None:
            self._tool_parser = self.__class__.tool_parser_cls(tokenizer, tools)