Queue storages have limit on how much they can store. In order to accommodate the payloads or messages larger than the limit, most of the Enterprise systems compress the message and publish to the queue.
In Mule 4, there are modules available for compressing and decompressing the message. When a publisher publishes message to queue and while retrieving the message using queue listener, it needs to be decompressed before processing. For this purpose we will place a “Decompress” connector after reading the payload and it accepts “payload” as the Compressed input.
Note: Here the queue referred to in the post is Azure Service Bus Queue and compression used is GZIP.
You can refer to the second flow in the previous post that talks about publish/subscribe pattern using Azure Service Bus and Mule.
The original flow is as below:
In this flow, in order to decompress the message we will place “Decompress” connector after reading payload as below:
“Decompress” connector properties are as below:
When we run the program, we get the error:
Message : There are at least two transformers that are an exact match for source type “class [B” and target type “class java.io.InputStream”. Transformers are: “[org.mule.runtime.core.privileged.transformer.CompositeConverter@743dfcf0, org.mule.runtime.core.privileged.transformer.CompositeConverter@4028a29a]”. Check your transformer use to avoid implicit transformations and choose between the two options.
Error type : MULE:TRANSFORMATION
Let us debug the flow and see why we got the error.
If you observe the payload is in the format of byte array []. Mule runtime by default transforms the payload to destination type if there is a one transformation available. If there are multiple transformations available, it errors out with an exception as in our case.
If you keenly observe the exception, it says at least two transformers are available with source and target type. The target type is “java.io.InputStream“. In this scenario, we need to note the target type and transform our payload to that type.
In order to fix the exception, I have added a java class that takes Byte array as input and returns java.io.InputStream.
Here is my class:
package compressasbanddecompress;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
public class test {
public static java.io.InputStream getStreamData(byte[] data){
InputStream myInputStream = new ByteArrayInputStream(data);
return myInputStream;
}
}
Now before “Decompress” connector, I have placed a transformer to call the java class and transform the message. The transformer calls the java class method as it is a static method. The output will now be a of format java.io.InputStream. This should fix the exception that has occurred earlier.
Note: Since “Decompress” message transforms the payload, you will lose the user properties and ackId after it is processed. So, you need to make sure you capture all the user properties and ackId before “Decompress” is executed.
Also in my flow first logger message is changed to “message before decompression #[payload]” and second logger message is changed to “message after de-compression #[payload]” to capture the messages while flow is executed. If I check the logs, I see message before and after decompression as below:
org.mule.runtime.core.internal.processor.LoggerMessageProcessor: message before decompression �???????����K�M�R
H-I-R���LL�L�@̪̂�� �����D��?Q+U .???
INFO 2019-07-14 21:40:51,928 [[MuleRuntime].cpuLight.13: [compressasbanddecompress].subscriberFlow.CPU_LITE @24829254] [event: c70930f0-a69f-11e9-a0f2-f45c898a2395] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: message after de-compression {
name:"Peter",
age:"42",
zipcode:"20874"
}
Hope this helps. If you find this post useful, do not forget to share with your friends. Happy coding!!!