Server : LiteSpeed System : Linux in-mum-web1112.main-hosting.eu 4.18.0-553.34.1.lve.el8.x86_64 #1 SMP Thu Jan 9 16:30:32 UTC 2025 x86_64 User : u451330669 ( 451330669) PHP Version : 8.2.27 Disable Function : NONE Directory : /opt/alt/python311/lib/python3.11/site-packages/redis/commands/json/ |
from json import JSONDecodeError, JSONDecoder, JSONEncoder
import redis
from ..helpers import nativestr
from .commands import JSONCommands
from .decoders import bulk_of_jsons, decode_list
class JSON(JSONCommands):
"""
Create a client for talking to json.
:param decoder:
:type json.JSONDecoder: An instance of json.JSONDecoder
:param encoder:
:type json.JSONEncoder: An instance of json.JSONEncoder
"""
def __init__(
self,
client,
version=None,
decoder=JSONDecoder(),
encoder=JSONEncoder(),
):
"""
Create a client for talking to json.
:param decoder:
:type json.JSONDecoder: An instance of json.JSONDecoder
:param encoder:
:type json.JSONEncoder: An instance of json.JSONEncoder
"""
# Set the module commands' callbacks
self.MODULE_CALLBACKS = {
"JSON.CLEAR": int,
"JSON.DEL": int,
"JSON.FORGET": int,
"JSON.GET": self._decode,
"JSON.MGET": bulk_of_jsons(self._decode),
"JSON.SET": lambda r: r and nativestr(r) == "OK",
"JSON.NUMINCRBY": self._decode,
"JSON.NUMMULTBY": self._decode,
"JSON.TOGGLE": self._decode,
"JSON.STRAPPEND": self._decode,
"JSON.STRLEN": self._decode,
"JSON.ARRAPPEND": self._decode,
"JSON.ARRINDEX": self._decode,
"JSON.ARRINSERT": self._decode,
"JSON.ARRLEN": self._decode,
"JSON.ARRPOP": self._decode,
"JSON.ARRTRIM": self._decode,
"JSON.OBJLEN": self._decode,
"JSON.OBJKEYS": self._decode,
"JSON.RESP": self._decode,
"JSON.DEBUG": self._decode,
}
self.client = client
self.execute_command = client.execute_command
self.MODULE_VERSION = version
for key, value in self.MODULE_CALLBACKS.items():
self.client.set_response_callback(key, value)
self.__encoder__ = encoder
self.__decoder__ = decoder
def _decode(self, obj):
"""Get the decoder."""
if obj is None:
return obj
try:
x = self.__decoder__.decode(obj)
if x is None:
raise TypeError
return x
except TypeError:
try:
return self.__decoder__.decode(obj.decode())
except AttributeError:
return decode_list(obj)
except (AttributeError, JSONDecodeError):
return decode_list(obj)
def _encode(self, obj):
"""Get the encoder."""
return self.__encoder__.encode(obj)
def pipeline(self, transaction=True, shard_hint=None):
"""Creates a pipeline for the JSON module, that can be used for executing
JSON commands, as well as classic core commands.
Usage example:
r = redis.Redis()
pipe = r.json().pipeline()
pipe.jsonset('foo', '.', {'hello!': 'world'})
pipe.jsonget('foo')
pipe.jsonget('notakey')
"""
p = Pipeline(
connection_pool=self.client.connection_pool,
response_callbacks=self.MODULE_CALLBACKS,
transaction=transaction,
shard_hint=shard_hint,
)
p._encode = self._encode
p._decode = self._decode
return p
class Pipeline(JSONCommands, redis.client.Pipeline):
"""Pipeline for the module."""