Module: Familia::Features::Relationships::RedisOperations

Defined in:
lib/familia/features/relationships/redis_operations.rb

Overview

Redis operations module providing atomic multi-collection operations and native Redis set operations for relationships

Instance Method Summary collapse

Instance Method Details

#atomic_operation(redis = nil) {|Redis| ... } ⇒ Array

Execute multiple Redis operations atomically using MULTI/EXEC

Examples:

Atomic multi-collection update

atomic_operation(redis) do |tx|
  tx.zadd("customer:123:domains", score, domain_id)
  tx.zadd("team:456:domains", score, domain_id)
  tx.hset("domain_index", domain_name, domain_id)
end

Parameters:

  • redis (Redis) (defaults to: nil)

    Redis connection to use

Yields:

  • (Redis)

    Yields Redis connection in transaction context

Returns:

  • (Array)

    Results from Redis transaction



21
22
23
24
25
26
27
# File 'lib/familia/features/relationships/redis_operations.rb', line 21

def atomic_operation(redis = nil)
  redis ||= redis_connection

  redis.multi do |tx|
    yield tx if block_given?
  end
end

#batch_zadd(redis_key, items, mode: :normal) ⇒ Object

Batch add multiple items to a sorted set

Examples:

Batch add domains with scores

batch_zadd("customer:domains", [
  { member: "domain1", score: encode_score(Time.now, permission: :read) },
  { member: "domain2", score: encode_score(Time.now, permission: :write) }
])

Parameters:

  • redis_key (String)

    Redis sorted set key

  • items (Array<Hash>)

    Array of String, score: Float hashes

  • mode (Symbol) (defaults to: :normal)

    Add mode (:normal, :nx, :xx, :lt, :gt)



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/familia/features/relationships/redis_operations.rb', line 146

def batch_zadd(redis_key, items, mode: :normal)
  return 0 if items.empty?

  redis = redis_connection
  zadd_args = items.flat_map { |item| [item[:score], item[:member]] }

  case mode
  when :nx
    redis.zadd(redis_key, zadd_args, nx: true)
  when :xx
    redis.zadd(redis_key, zadd_args, xx: true)
  when :lt
    redis.zadd(redis_key, zadd_args, lt: true)
  when :gt
    redis.zadd(redis_key, zadd_args, gt: true)
  else
    redis.zadd(redis_key, zadd_args)
  end
end

#cleanup_temp_keys(pattern = 'temp:*', batch_size = 100) ⇒ Object

Clean up expired temporary keys

Examples:

Clean up old temporary keys

cleanup_temp_keys("temp:user_*", 100)

Parameters:

  • pattern (String) (defaults to: 'temp:*')

    Pattern to match temporary keys

  • batch_size (Integer) (defaults to: 100)

    Number of keys to process at once



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/familia/features/relationships/redis_operations.rb', line 228

def cleanup_temp_keys(pattern = 'temp:*', batch_size = 100)
  self.class.dbclient
  cursor = 0

  loop do
    cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size)

    if keys.any?
      # Check TTL and remove keys that should have expired
      keys.each_slice(batch_size) do |key_batch|
        dbclient.pipelined do |pipeline|
          key_batch.each do |key|
            ttl = dbclient.ttl(key)
            pipeline.del(key) if ttl == -1 # Key exists but has no TTL
          end
        end
      end
    end

    break if cursor.zero?
  end
end

#create_temp_key(base_name, ttl = 300) ⇒ String

Create temporary Redis key with automatic cleanup

Examples:

temp_key = create_temp_key("user_accessible_domains", 600)
#=> "temp:user_accessible_domains:1704067200:abc123"

Parameters:

  • base_name (String)

    Base name for the temporary key

  • ttl (Integer) (defaults to: 300)

    TTL in seconds (default: 300)

Returns:

  • (String)

    Generated temporary key name



124
125
126
127
128
129
130
131
132
133
# File 'lib/familia/features/relationships/redis_operations.rb', line 124

def create_temp_key(base_name, ttl = 300)
  timestamp = Time.now.to_i
  random_suffix = SecureRandom.hex(3)
  temp_key = "temp:#{base_name}:#{timestamp}:#{random_suffix}"

  # Set immediate expiry to ensure cleanup even if operation fails
  redis_connection.expire(temp_key, ttl)

  temp_key
end

#query_by_score(redis_key, start_score = '-inf', end_score = '+inf', offset: 0, count: -1,, with_scores: false, min_permission: nil) ⇒ Array

Query sorted set with score filtering and permission checking

Examples:

Query domains with read permission or higher

query_by_score("customer:domains",
               encode_score(1.hour.ago, 0),
               encode_score(Time.now, MAX_METADATA),
               min_permission: :read)

Parameters:

  • redis_key (String)

    Redis sorted set key

  • start_score (Float) (defaults to: '-inf')

    Minimum score (inclusive)

  • end_score (Float) (defaults to: '+inf')

    Maximum score (inclusive)

  • offset (Integer) (defaults to: 0)

    Offset for pagination

  • count (Integer) (defaults to: -1,)

    Maximum number of results

  • with_scores (Boolean) (defaults to: false)

    Include scores in results

  • min_permission (Symbol) (defaults to: nil)

    Minimum permission level required

Returns:

  • (Array)

    Query results



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/familia/features/relationships/redis_operations.rb', line 182

def query_by_score(redis_key, start_score = '-inf', end_score = '+inf',
                   offset: 0, count: -1, with_scores: false, min_permission: nil)
  self.class.dbclient

  # Adjust score range for permission filtering
  if min_permission
    permission_value = ScoreEncoding.permission_level_value(min_permission)
    # Ensure minimum score includes required permission level
    if start_score.is_a?(Numeric)
      decoded = decode_score(start_score)
      if decoded[:permissions] < permission_value
        start_score = encode_score(decoded[:timestamp],
                                   permission_value)
      end
    else
      start_score = encode_score(0, permission_value)
    end
  end

  options = {
    limit: (count.positive? ? [offset, count] : nil),
    with_scores: with_scores
  }.compact

  results = dbclient.zrangebyscore(redis_key, start_score, end_score, **options)

  # Filter results by permission if needed using correct bitwise operations
  if min_permission && with_scores
    permission_mask = ScoreEncoding.permission_level_value(min_permission)
    results = results.select do |_member, score|
      decoded = decode_score(score)
      # Use bitwise AND to check if permission mask is satisfied
      decoded[:permissions].allbits?(permission_mask)
    end
  end

  results
end

#redis_connectionObject

Get Redis connection for the current class or instance



252
253
254
255
256
257
258
259
260
# File 'lib/familia/features/relationships/redis_operations.rb', line 252

def redis_connection
  if self.class.respond_to?(:dbclient)
    self.class.dbclient
  elsif respond_to?(:dbclient)
    dbclient
  else
    Familia.dbclient
  end
end

#set_operation(operation, destination, source_keys, weights: nil, aggregate: :sum, ttl: nil) ⇒ Integer

Perform Redis set operations (union, intersection, difference) on sorted sets

Examples:

Union of accessible domains

set_operation(:union, "temp:accessible_domains:#{user_id}",
              ["customer:domains", "team:domains", "org:domains"],
              ttl: 300)

Parameters:

  • operation (Symbol)

    Operation type (:union, :intersection, :difference)

  • destination (String)

    Redis key for result storage

  • source_keys (Array<String>)

    Source Redis keys to operate on

  • weights (Array<Float>) (defaults to: nil)

    Optional weights for union operations

  • aggregate (Symbol) (defaults to: :sum)

    Aggregation method (:sum, :min, :max)

  • ttl (Integer) (defaults to: nil)

    TTL for destination key in seconds

Returns:

  • (Integer)

    Number of elements in resulting set



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/familia/features/relationships/redis_operations.rb', line 79

def set_operation(operation, destination, source_keys, weights: nil, aggregate: :sum, ttl: nil)
  return 0 if source_keys.empty?

  redis = redis_connection

  atomic_operation(redis) do |tx|
    case operation
    when :union
      if weights
        tx.zunionstore(destination, source_keys.zip(weights).to_h, aggregate: aggregate)
      else
        tx.zunionstore(destination, source_keys, aggregate: aggregate)
      end
    when :intersection
      if weights
        tx.zinterstore(destination, source_keys.zip(weights).to_h, aggregate: aggregate)
      else
        tx.zinterstore(destination, source_keys, aggregate: aggregate)
      end
    when :difference
      first_key = source_keys.first
      other_keys = source_keys[1..] || []

      tx.zunionstore(destination, [first_key])
      other_keys.each do |key|
        members = redis.zrange(key, 0, -1)
        tx.zrem(destination, members) if members.any?
      end
    end

    tx.expire(destination, ttl) if ttl
  end

  redis.zcard(destination)
end

#update_multiple_presence(collections, action, identifier, default_score = nil) ⇒ Object

Update object presence in multiple collections atomically

Examples:

Update presence in multiple collections

update_multiple_presence([
  { key: "customer:123:domains", score: current_score },
  { key: "team:456:domains", score: permission_encode(Time.now, :read) },
  { key: "org:789:all_domains", score: current_score }
], :add, domain.identifier)

Parameters:

  • collections (Array<Hash>)

    Array of collection configurations

  • action (Symbol)

    Action to perform (:add, :remove)

  • identifier (String)

    Object identifier

  • default_score (Float) (defaults to: nil)

    Default score if not specified per collection



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/familia/features/relationships/redis_operations.rb', line 42

def update_multiple_presence(collections, action, identifier, default_score = nil)
  return unless collections&.any?

  redis = self.class.dbclient

  atomic_operation(redis) do |tx|
    collections.each do |collection_config|
      redis_key = collection_config[:key]
      score = collection_config[:score] || default_score || current_score

      case action
      when :add
        tx.zadd(redis_key, score, identifier)
      when :remove
        tx.zrem(redis_key, identifier)
      when :update
        # Use ZADD with XX flag to only update existing members
        tx.zadd(redis_key, score, identifier, xx: true)
      end
    end
  end
end