FEATURE: support custom codecs for transport (#250)

FEATURE: support custom codecs for transport (#250)

Adds support for MessageBus.transport_codec which can be used to optimise performance of transport

  • Oj can be used as a serializer vs JSON which is a bit slower
  • Users can implement custom codecs
diff --git a/CHANGELOG b/CHANGELOG
index 42c54a6..2029548 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,7 @@
+UNRELEASED
+
+  - FEATURE: Introduce support for transport codecs
+
 28-04-2021
 
 - Version 3.3.5
diff --git a/Gemfile b/Gemfile
index d2924dc..ea22e39 100644
--- a/Gemfile
+++ b/Gemfile
@@ -14,10 +14,12 @@ group :test do
   gem 'rack-test', require: 'rack/test'
   gem 'jasmine'
   gem 'puma'
+  gem 'm'
 end
 
 group :test, :development do
   gem 'byebug'
+  gem 'oj'
 end
 
 group :development do
diff --git a/README.md b/README.md
index 4468a48..384b277 100644
--- a/README.md
+++ b/README.md
@@ -435,6 +435,31 @@ MessageBus.configure(backend: :memory)
 
 The `:clear_every` option supported by the PostgreSQL backend is also supported by the in-memory backend.
 
+
+### Transport codecs
+
+By default MessageBus serializes messages to the backend using JSON. Under most situation this performs extremely well.
+
+In some exceptional cases you may consider a different transport codec. To configure a custom codec use:
+
+`‍``ruby
+MessageBus.configure(transport_codec: codec)
+`‍``
+
+A codec class must implement MessageBus::Codec::Base. Specifically an `encode` and `decode` method.
+
+See the `bench` directory for examples where the default JSON codec can perform poorly. A specific examples may be
+attempting to distribute a message to a restricted list of thousands of users. In cases like this you may consider
+using a packed string encoder.
+
+Keep in mind, much of MessageBus internals and supporting tools expect data to be converted to JSON and back, if you use a naive (and fast) `Marshal` based codec you may need to limit the features you use. Specifically the Postgresql backend expects the codec never to return a string with `\u0000`, additionally some classes like DistributedCache expect keys to be converted to Strings.
+
+Another example may be very large and complicated messages where Oj in compatability mode outperforms JSON. To opt for the Oj codec use:
+
+`‍``
+MessageBus.configure(transport_codec: MessageBus::Codec::Oj.new)
+`‍``
+
 ### Forking/threading app servers
 
 If you're using a forking or threading app server and you're not getting immediate delivery of published messages, you might need to configure your web server to re-connect to the message_bus backend
diff --git a/bench/codecs/all_codecs.rb b/bench/codecs/all_codecs.rb
new file mode 100644
index 0000000..4583c0f
--- /dev/null
+++ b/bench/codecs/all_codecs.rb
@@ -0,0 +1,39 @@
+# frozen_string_literal: true
+
+require_relative './packed_string'
+require_relative './string_hack'
+require_relative './marshal'
+
+def all_codecs
+  {
+    json: MessageBus::Codec::Json.new,
+    oj: MessageBus::Codec::Oj.new,
+    marshal: MarshalCodec.new,
+    packed_string_4_bytes: PackedString.new("V"),
+    packed_string_8_bytes: PackedString.new("Q"),
+    string_hack: StringHack.new
+  }
+end
+
+def bench_decode(hash, user_needle)
+  encoded_data = all_codecs.map do |name, codec|
+    [
+      name, codec, codec.encode(hash.dup)
+    ]
+  end
+
+  Benchmark.ips do |x|
+
+    encoded_data.each do |name, codec, encoded|
+      x.report(name) do |n|
+        while n > 0
+          decoded = codec.decode(encoded)
+          decoded["user_ids"].include?(user_needle)
+          n -= 1
+        end
+      end
+    end
+
+    x.compare!
+  end
+end
diff --git a/bench/codecs/marshal.rb b/bench/codecs/marshal.rb
new file mode 100644
index 0000000..6223549
--- /dev/null
+++ b/bench/codecs/marshal.rb
@@ -0,0 +1,11 @@
+# frozen_string_literal: true
+
+class MarshalCodec
+  def encode(hash)
+    ::Marshal.dump(hash)
+  end
+
+  def decode(payload)
+    ::Marshal.load(payload)
+  end
+end
diff --git a/bench/codecs/packed_string.rb b/bench/codecs/packed_string.rb
new file mode 100644
index 0000000..c9c9db5
--- /dev/null
+++ b/bench/codecs/packed_string.rb
@@ -0,0 +1,67 @@
+# frozen_string_literal: true
+
+class PackedString
+  class FastIdList
+    def self.from_array(array, pack_with)
+      new(array.sort.pack("#{pack_with}*"), pack_with)
+    end
+
+    def self.from_string(string, pack_with)
+      new(string, pack_with)
+    end
+
+    def initialize(packed, pack_with)
+      raise "unknown pack format, expecting Q or V" if pack_with != "V" && pack_with != "Q"
+      @packed = packed
+      @pack_with = pack_with
+      @slot_size = pack_with == "V" ? 4 : 8
+    end
+
+    def include?(id)
+      found = (0...length).bsearch do |index|
+        @packed.byteslice(index * @slot_size, @slot_size).unpack1(@pack_with) >= id
+      end
+
+      found && @packed.byteslice(found * @slot_size, @slot_size).unpack1(@pack_with) == id
+    end
+
+    def length
+      @length ||= @packed.bytesize / @slot_size
+    end
+
+    def to_a
+      @packed.unpack("#{@pack_with}*")
+    end
+
+    def to_s
+      @packed
+    end
+  end
+
+  def initialize(pack_with = "V")
+    @pack_with = pack_with
+    @oj_options = { mode: :compat }
+  end
+
+  def encode(hash)
+
+    if user_ids = hash["user_ids"]
+      hash["user_ids"] = FastIdList.from_array(hash["user_ids"], @pack_with).to_s
+    end
+
+    hash["data"] = ::Oj.dump(hash["data"], @oj_options)
+
+    Marshal.dump(hash)
+  end
+
+  def decode(payload)
+    result = Marshal.load(payload)
+    result["data"] = ::Oj.load(result["data"], @oj_options)
+
+    if str = result["user_ids"]
+      result["user_ids"] = FastIdList.from_string(str, @pack_with)
+    end
+
+    result
+  end
+end
diff --git a/bench/codecs/string_hack.rb b/bench/codecs/string_hack.rb
new file mode 100644
index 0000000..1ab190c
--- /dev/null
+++ b/bench/codecs/string_hack.rb
@@ -0,0 +1,47 @@
+# frozen_string_literal: true
+
+class StringHack
+  class FastIdList
+    def self.from_array(array)
+      new(",#{array.join(",")},")
+    end
+
+    def self.from_string(string)
+      new(string)
+    end
+
+    def initialize(packed)
+      @packed = packed
+    end
+
+    def include?(id)
+      @packed.include?(",#{id},")
+    end
+
+    def to_s
+      @packed
+    end
+  end
+
+  def initialize
+    @oj_options = { mode: :compat }
+  end
+
+  def encode(hash)
+    if user_ids = hash["user_ids"]
+      hash["user_ids"] = FastIdList.from_array(user_ids).to_s
+    end
+
+    ::Oj.dump(hash, @oj_options)
+  end
+
+  def decode(payload)
+    result = ::Oj.load(payload, @oj_options)
+
+    if str = result["user_ids"]
+      result["user_ids"] = FastIdList.from_string(str)
+    end
+
+    result
+  end
+end
diff --git a/bench/codecs_large_user_list.rb b/bench/codecs_large_user_list.rb
new file mode 100644
index 0000000..238e0a6
--- /dev/null
+++ b/bench/codecs_large_user_list.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+require 'bundler/inline'
+
+gemfile do
+  source 'https://rubygems.org'
+  gem 'message_bus', path: '../'
+  gem 'benchmark-ips'
+  gem 'oj'
+end
+
+require 'benchmark/ips'
+require 'message_bus'
+require_relative 'codecs/all_codecs'
+
+bench_decode({
+  "data" => "hello world",
+  "user_ids" => (1..10000).to_a,
+  "group_ids" => nil,
+  "client_ids" => nil
+  }, 5000
+)
+
+# packed_string_4_bytes:   127176.1 i/s
+# packed_string_8_bytes:    94494.6 i/s - 1.35x  (± 0.00) slower
+#          string_hack:    26403.4 i/s - 4.82x  (± 0.00) slower
+#              marshal:     4985.5 i/s - 25.51x  (± 0.00) slower
+#                   oj:     3072.9 i/s - 41.39x  (± 0.00) slower
+#                 json:     2222.7 i/s - 57.22x  (± 0.00) slower
diff --git a/bench/codecs_standard_message.rb b/bench/codecs_standard_message.rb
new file mode 100644
index 0000000..b8401fe
--- /dev/null
+++ b/bench/codecs_standard_message.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+require 'bundler/inline'
+
+gemfile do
+  source 'https://rubygems.org'
+  gem 'message_bus', path: '../'
+  gem 'benchmark-ips'
+  gem 'oj'
+end
+
+require 'benchmark/ips'
+require 'message_bus'
+require_relative 'codecs/all_codecs'
+
+bench_decode({

[... diff too long, it was truncated ...]

GitHub sha: 0d79789b

This commit appears in #250 which was merged by benlangfeld.