Al-HUWAITI Shell
Al-huwaiti


Server : LiteSpeed
System : Linux in-mum-web1112.main-hosting.eu 4.18.0-553.34.1.lve.el8.x86_64 #1 SMP Thu Jan 9 16:30:32 UTC 2025 x86_64
User : u451330669 ( 451330669)
PHP Version : 8.2.27
Disable Function : NONE
Directory :  /opt/alt/python311/lib/python3.11/site-packages/redis/commands/json/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/alt/python311/lib/python3.11/site-packages/redis/commands/json/__init__.py
from json import JSONDecodeError, JSONDecoder, JSONEncoder

import redis

from ..helpers import nativestr
from .commands import JSONCommands
from .decoders import bulk_of_jsons, decode_list


class JSON(JSONCommands):
    """
    Create a client for talking to json.

    :param decoder:
    :type json.JSONDecoder: An instance of json.JSONDecoder

    :param encoder:
    :type json.JSONEncoder: An instance of json.JSONEncoder
    """

    def __init__(
        self,
        client,
        version=None,
        decoder=JSONDecoder(),
        encoder=JSONEncoder(),
    ):
        """
        Create a client for talking to json.

        :param decoder:
        :type json.JSONDecoder: An instance of json.JSONDecoder

        :param encoder:
        :type json.JSONEncoder: An instance of json.JSONEncoder
        """
        # Set the module commands' callbacks
        self.MODULE_CALLBACKS = {
            "JSON.CLEAR": int,
            "JSON.DEL": int,
            "JSON.FORGET": int,
            "JSON.GET": self._decode,
            "JSON.MGET": bulk_of_jsons(self._decode),
            "JSON.SET": lambda r: r and nativestr(r) == "OK",
            "JSON.NUMINCRBY": self._decode,
            "JSON.NUMMULTBY": self._decode,
            "JSON.TOGGLE": self._decode,
            "JSON.STRAPPEND": self._decode,
            "JSON.STRLEN": self._decode,
            "JSON.ARRAPPEND": self._decode,
            "JSON.ARRINDEX": self._decode,
            "JSON.ARRINSERT": self._decode,
            "JSON.ARRLEN": self._decode,
            "JSON.ARRPOP": self._decode,
            "JSON.ARRTRIM": self._decode,
            "JSON.OBJLEN": self._decode,
            "JSON.OBJKEYS": self._decode,
            "JSON.RESP": self._decode,
            "JSON.DEBUG": self._decode,
        }

        self.client = client
        self.execute_command = client.execute_command
        self.MODULE_VERSION = version

        for key, value in self.MODULE_CALLBACKS.items():
            self.client.set_response_callback(key, value)

        self.__encoder__ = encoder
        self.__decoder__ = decoder

    def _decode(self, obj):
        """Get the decoder."""
        if obj is None:
            return obj

        try:
            x = self.__decoder__.decode(obj)
            if x is None:
                raise TypeError
            return x
        except TypeError:
            try:
                return self.__decoder__.decode(obj.decode())
            except AttributeError:
                return decode_list(obj)
        except (AttributeError, JSONDecodeError):
            return decode_list(obj)

    def _encode(self, obj):
        """Get the encoder."""
        return self.__encoder__.encode(obj)

    def pipeline(self, transaction=True, shard_hint=None):
        """Creates a pipeline for the JSON module, that can be used for executing
        JSON commands, as well as classic core commands.

        Usage example:

        r = redis.Redis()
        pipe = r.json().pipeline()
        pipe.jsonset('foo', '.', {'hello!': 'world'})
        pipe.jsonget('foo')
        pipe.jsonget('notakey')
        """
        p = Pipeline(
            connection_pool=self.client.connection_pool,
            response_callbacks=self.MODULE_CALLBACKS,
            transaction=transaction,
            shard_hint=shard_hint,
        )
        p._encode = self._encode
        p._decode = self._decode
        return p


class Pipeline(JSONCommands, redis.client.Pipeline):
    """Pipeline for the module."""

Al-HUWAITI Shell