/* --------------------------------------------------------------------------
* Copyright 2006 J. Reid Morrison
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* --------------------------------------------------------------------------*/
#include "wmq.h"
#include "decode_rfh.h"
/* --------------------------------------------------
* Initialize Ruby ID's for Message Class
*
* This function is called when the library is loaded
* by ruby
* --------------------------------------------------*/
static ID ID_data;
static ID ID_message;
static ID ID_descriptor;
static ID ID_headers;
static ID ID_data_set;
static ID ID_size;
static ID ID_name_value;
static ID ID_xml;
static ID ID_gsub;
static ID ID_header_type;
void Message_id_init()
{
ID_data = rb_intern("data");
ID_size = rb_intern("size");
ID_data_set = rb_intern("data=");
ID_descriptor = rb_intern("descriptor");
ID_headers = rb_intern("headers");
ID_message = rb_intern("message");
ID_name_value = rb_intern("name_value");
ID_xml = rb_intern("xml");
ID_gsub = rb_intern("gsub");
ID_header_type = rb_intern("header_type");
}
void Message_build(PMQBYTE* pq_pp_buffer, PMQLONG pq_p_buffer_size, MQLONG trace_level,
VALUE parms, PPMQVOID pp_buffer, PMQLONG p_total_length, PMQMD pmqmd)
{
VALUE data;
VALUE descriptor;
VALUE headers;
VALUE message;
/* :data is an optional parameter, that if supplied overrides message.data */
data = rb_hash_aref(parms, ID2SYM(ID_data));
if(!NIL_P(data))
{
Check_Type(data, T_STRING);
*p_total_length = RSTRING_LEN(data);
*pp_buffer = RSTRING_PTR(data);
}
/* :message is optional */
message = rb_hash_aref(parms, ID2SYM(ID_message));
if(!NIL_P(message))
{
if (NIL_P(data)) /* Obtain data from message.data */
{
data = rb_funcall(message, ID_data, 0);
}
descriptor = rb_funcall(message, ID_descriptor, 0);
Check_Type(descriptor, T_HASH);
Message_to_mqmd(descriptor, pmqmd);
headers = rb_funcall(message, ID_headers, 0);
Check_Type(headers, T_ARRAY);
Check_Type(data, T_STRING);
/* Headers present? */
if ( !NIL_P(headers) &&
(NUM2LONG(rb_funcall(headers, ID_size, 0))>0) )
{
MQLONG data_offset = 0;
struct Message_build_header_arg arg;
VALUE next_header;
VALUE first_header;
VALUE header_type;
VALUE ind_val;
size_t index;
if(trace_level>2)
printf ("WMQ::Queue#put %ld Header(s) supplied\n", NUM2LONG(rb_funcall(headers, ID_size, 0)));
/* First sanity check: Do we even have enough space for the data being written and a small header */
if(RSTRING_LEN(data) + 128 >= *pq_p_buffer_size)
{
MQLONG new_size = RSTRING_LEN(data) + 512; /* Add space for data and a header */
if(trace_level>2)
printf ("WMQ::Queue#reallocate Resizing buffer from %ld to %ld bytes\n", *pq_p_buffer_size, (long)new_size);
*pq_p_buffer_size = new_size;
free(*pq_pp_buffer);
*pq_pp_buffer = ALLOC_N(unsigned char, new_size);
}
arg.pp_buffer = pq_pp_buffer;
arg.p_buffer_size = pq_p_buffer_size;
arg.data_length = RSTRING_LEN(data);
arg.p_data_offset = &data_offset;
arg.trace_level = trace_level;
arg.next_header_id = 0;
arg.data_format = pmqmd->Format;
if(trace_level>2)
printf ("WMQ::Queue#put Building %ld headers.\n", RARRAY_LEN(headers));
for(index = 0; index < RARRAY_LEN(headers); index++)
{
/*
* Look at the next Header so that this header can set it's format
* to that required for the next header.
*/
ind_val = LONG2FIX(index+1);
next_header = rb_ary_aref(1, &ind_val, headers);
if(NIL_P(next_header))
{
arg.next_header_id = 0;
}
else
{
header_type = rb_hash_aref(next_header, ID2SYM(ID_header_type));
if (!NIL_P(header_type))
{
arg.next_header_id = rb_to_id(header_type);
}
}
ind_val = LONG2FIX(index);
Message_build_header(rb_ary_aref(1, &ind_val, headers), &arg);
}
/* Obtain Format of first header and copy in MQMD.Format */
ind_val = LONG2FIX(0);
first_header = rb_ary_aref(1, &ind_val, headers);
header_type = rb_hash_aref(first_header, ID2SYM(ID_header_type));
if (!NIL_P(header_type))
{
Message_build_set_format(rb_to_id(header_type), pmqmd->Format);
}
if(trace_level>2)
printf ("WMQ::Queue#put done building headers. Offset is now %ld\n", *arg.p_data_offset);
memcpy((*pq_pp_buffer) + data_offset, RSTRING_PTR(data), RSTRING_LEN(data));
*p_total_length = data_offset + RSTRING_LEN(data);
*pp_buffer = *pq_pp_buffer;
}
else
{
*p_total_length = RSTRING_LEN(data);
*pp_buffer = RSTRING_PTR(data);
}
}
/* If :message is not supplied, then :data must be supplied in the parameter list */
else if(NIL_P(data))
{
rb_raise(rb_eArgError, "At least one of :message or :data is required.");
}
return;
}
/*
* Extract MQMD from descriptor hash
*/
void Message_build_mqmd(VALUE self, PMQMD pmqmd)
{
Message_to_mqmd(rb_funcall(self, ID_descriptor, 0), pmqmd);
}
/*
* call-seq:
* new(...)
*
* Optional Named Parameters (as a single hash):
* * :data
* * Data to be written, or was read from the queue
* * :descriptor
* * Desciptor
*
* Example:
* message = WMQ::Message.new
*
* Example:
* message = WMQ::Message.new(:data=>'Hello World',
* :descriptor=> {
* :format => WMQ::MQFMT_STRING
* })
*/
VALUE Message_initialize(int argc, VALUE *argv, VALUE self)
{
VALUE parms = Qnil;
/* Extract optional parameter */
rb_scan_args(argc, argv, "01", &parms);
if (NIL_P(parms))
{
rb_iv_set(self, "@data", Qnil);
rb_iv_set(self, "@headers", rb_ary_new());
rb_iv_set(self, "@descriptor", rb_hash_new());
}
else
{
VALUE val;
Check_Type(parms, T_HASH);
rb_iv_set(self, "@data", rb_hash_aref(parms, ID2SYM(ID_data)));
val = rb_hash_aref(parms, ID2SYM(ID_headers));
if (NIL_P(val))
{
rb_iv_set(self, "@headers", rb_ary_new());
}
else
{
rb_iv_set(self, "@headers", val);
}
val = rb_hash_aref(parms, ID2SYM(ID_descriptor));
if (NIL_P(val))
{
rb_iv_set(self, "@headers", rb_hash_new());
}
else
{
rb_iv_set(self, "@descriptor", val);
}
}
return Qnil;
}
/*
* Clear out the message data and headers
*
* Note:
* * The descriptor is not affected in any way
*/
VALUE Message_clear(VALUE self)
{
rb_iv_set(self, "@data", Qnil);
rb_iv_set(self, "@headers", rb_ary_new());
return self;
}
/*
* Automatically grow size of buffer to meet required size
* Existing data upto offset is preserved by copying to the new buffer
*
* additional_size: Size of any additional data to be written to this buffer
* EXCLUDING current offset and size of data to be written
*
* Returns pointer to new buffer incremented by offset within buffer
*/
PMQBYTE Message_autogrow_data_buffer(struct Message_build_header_arg* parg, MQLONG additional_size)
{
MQLONG size = *(parg->p_data_offset) + parg->data_length + additional_size;
/* Is buffer large enough for headers */
if(size >= *(parg->p_buffer_size))
{
PMQBYTE old_buffer = *(parg->pp_buffer);
size += 512; /* Additional space for subsequent headers */
if(parg->trace_level>2)
printf ("WMQ::Message Reallocating buffer from %ld to %ld\n", *(parg->p_buffer_size), (long)size);
*(parg->p_buffer_size) = size;
*(parg->pp_buffer) = ALLOC_N(char, size);
memcpy(*(parg->pp_buffer), old_buffer, *(parg->p_data_offset));
free(old_buffer);
}
return *(parg->pp_buffer) + *(parg->p_data_offset);
}
/*
* Concatenate the passed name or value element to the existing string
*/
static void Message_name_value_concat(VALUE string, VALUE element)
{
VALUE str = StringValue(element);
if (RSTRING_LEN(str) == 0) /* Empty String: "" */
{
rb_str_concat(string, rb_str_new2("\"\""));
}
else
{
void* contains_spaces = memchr(RSTRING_PTR(str),' ',RSTRING_LEN(str));
void* contains_dbl_quotes = memchr(RSTRING_PTR(str),'"',RSTRING_LEN(str));
if(contains_spaces == NULL && contains_dbl_quotes == NULL)
{
rb_str_concat(string, str);
}
else
{
VALUE quote = rb_str_new2("\"");
rb_str_concat(string, quote);
if(contains_dbl_quotes)
{
rb_str_concat(string, rb_funcall(str, ID_gsub, 2, quote, rb_str_new2("\"\"")));
}
else
{
rb_str_concat(string, str);
}
rb_str_concat(string, quote);
}
}
}
struct Message_build_rf_header_each_value_arg {
VALUE string;
VALUE key;
VALUE space;
};
static VALUE Message_build_rf_header_each_value(VALUE value, struct Message_build_rf_header_each_value_arg* parg)
{
Message_name_value_concat(parg->string, parg->key);
rb_str_concat(parg->string, parg->space);
Message_name_value_concat(parg->string, value);
rb_str_concat(parg->string, parg->space);
return Qnil;
}
static int Message_build_rf_header_each (VALUE key, VALUE value, VALUE string)
{
VALUE space = rb_str_new2(" ");
/* If Value is an Array, need to repeat name for each value */
if (TYPE(value) == T_ARRAY)
{
struct Message_build_rf_header_each_value_arg arg;
arg.key = key;
arg.string = string;
arg.space = space;
rb_iterate (rb_each, value, Message_build_rf_header_each_value, (VALUE)&arg);
}
else
{
Message_name_value_concat(string, key);
rb_str_concat(string, space);
Message_name_value_concat(string, value);
rb_str_concat(string, space);
}
return 0;
}
void Message_build_rf_header (VALUE hash, struct Message_build_header_arg* parg)
{
PMQBYTE p_data = *(parg->pp_buffer) + *(parg->p_data_offset);
static MQRFH MQRFH_DEF = {MQRFH_DEFAULT};
MQLONG name_value_len = 0;
PMQCHAR p_name_value = 0;
VALUE name_value = rb_hash_aref(hash, ID2SYM(ID_name_value));
MQRFH_DEF.CodedCharSetId = MQCCSI_INHERIT;
if(parg->trace_level>2)
printf ("WMQ::Message#build_rf_header Found rf_header\n");
if (!NIL_P(name_value))
{
if (TYPE(name_value) == T_HASH)
{
VALUE name_value_str = rb_str_buf_new(512); /* Allocate 512 char buffer, will grow as needed */
rb_hash_foreach(name_value, Message_build_rf_header_each, name_value_str);
name_value = name_value_str;
}
else if(TYPE(name_value) != T_STRING)
{
rb_raise(rb_eArgError, ":name_value supplied in rf_header to WMQ::Message#headers must be either a String or a Hash");
}
name_value_len = RSTRING_LEN(name_value);
if (name_value_len % 4) /* Not on 4 byte boundary ? */
{
rb_str_concat(name_value, rb_str_new(" ", 4 - (name_value_len % 4)));
name_value_len = RSTRING_LEN(name_value);
}
p_name_value = RSTRING_PTR(name_value);
}
p_data = Message_autogrow_data_buffer(parg, sizeof(MQRFH)+name_value_len);
memcpy(p_data, &MQRFH_DEF, sizeof(MQRFH));
Message_to_mqrfh(hash, (PMQRFH)p_data);
((PMQRFH)p_data)->StrucLength = sizeof(MQRFH) + name_value_len;
if(parg->next_header_id)
{
Message_build_set_format(parg->next_header_id, ((PMQRFH)p_data)->Format);
}
else
{
strncpy(((PMQRFH)p_data)->Format, parg->data_format, MQ_FORMAT_LENGTH);
}
*(parg->p_data_offset) += sizeof(MQRFH);
p_data += sizeof(MQRFH);
if(name_value_len)
{
memcpy(p_data, p_name_value, name_value_len);
*(parg->p_data_offset) += name_value_len;
}
if(parg->trace_level>3)
printf ("WMQ::Message#build_rf_header Sizeof namevalue string:%ld\n", (long)name_value_len);
if(parg->trace_level>2)
printf ("WMQ::Message#build_rf_header data offset:%ld\n", *(parg->p_data_offset));
}
static void Message_deblock_rf_header_each_pair(const char *p_name, const char *p_value, void* p_name_value_hash)
{
VALUE name_value_hash = (VALUE)p_name_value_hash;
VALUE key = rb_str_new2(p_name);
VALUE value = rb_str_new2(p_value);
/*
* If multiple values arrive for the same name (key) need to put values in an array
*/
VALUE existing = rb_hash_aref(name_value_hash, key);
if(NIL_P(existing))
{
rb_hash_aset(name_value_hash, key, value);
}
else
{
if(TYPE(existing) == T_ARRAY) /* Add to existing Array */
{
rb_ary_push(existing, value);
}
else /* Convert existing entry into an array */
{
VALUE array = rb_ary_new();
rb_ary_push(array, existing);
rb_ary_push(array, value);
rb_hash_aset(name_value_hash, key, array);
}
}
}
/*
* Deblock Any custom data following RF Header
* The RF Header has already been deblocked into hash
* p_data points to the beginning of the RF Header structure
*
* msg_len is the length of the remainder of the message from this header onwards
* It includes the length of any data that may follow
*
* Returns the length of RF Header plus the size of any custom data following it
*/
MQLONG Message_deblock_rf_header (VALUE hash, PMQBYTE p_data, MQLONG data_len)
{
MQLONG size = ((PMQRFH)p_data)->StrucLength;
VALUE name_value_hash = rb_hash_new();
rfh_toktype_t toktype;
if(size < 0 || size > data_len) /* Poison Message */
{
printf("WMQ::Message_deblock_rf_header StrucLength supplied in MQRFH exceeds total message length\n");
return 0;
}
toktype = rfh_decode_name_val_str(p_data + sizeof(MQRFH),
size - sizeof(MQRFH),
Message_deblock_rf_header_each_pair,
(void*)name_value_hash);
if (toktype != TT_END)
{
printf("Could not parse rfh name value string, reason %s\n",rfh_toktype_to_s(toktype));
}
rb_hash_aset(hash, ID2SYM(ID_name_value), name_value_hash);
return size;
}
/*
* RFH2 Header can contain multiple XML-like strings
* Message consists of:
* MQRFH2
* xml-string1-length (MQLONG)
* xml-string1 (Padded with spaces to match 4 byte boundary)
* xml-string2-length (MQLONG)
* xml-string2 (Padded with spaces to match 4 byte boundary)
* ....
*/
MQLONG Message_deblock_rf_header_2 (VALUE hash, PMQBYTE p_buffer, MQLONG data_len)
{
MQLONG size = ((PMQRFH2)p_buffer)->StrucLength;
PMQBYTE p_data = p_buffer + sizeof(MQRFH2);
PMQBYTE p_end = p_buffer + size; /* Points to byte after last character */
MQLONG xml_len = 0;
VALUE xml_ary = rb_ary_new();
PMQBYTE pChar;
size_t length;
size_t i;
if(size < 0 || size > data_len) /* Poison Message */
{
printf("WMQ::Message_deblock_rf_header_2 StrucLength supplied in MQRFH exceeds total message length\n");
return 0;
}
rb_hash_aset(hash, ID2SYM(ID_xml), xml_ary);
while(p_data < p_end)
{
xml_len = *(PMQLONG)p_data;
p_data += sizeof(MQLONG);
if (xml_len < 0 || p_data+xml_len > p_end)
{
printf("WMQ::Message#deblock_rf_header_2 Poison Message received, stopping further processing\n");
p_data = p_end;
}
else
{
/*
* Strip trailing spaces added as pad characters during put
*/
length = 0;
pChar = p_data + xml_len - 1;
for (i = xml_len; i > 0; i--)
{
if (*pChar != ' ')
{
length = i;
break;
}
pChar--;
}
rb_ary_push(xml_ary, rb_str_new(p_data, length));
p_data += xml_len;
}
}
return size;
}
static VALUE Message_build_rf_header_2_each(VALUE element, struct Message_build_header_arg* parg)
{
VALUE str = StringValue(element);
MQLONG length = RSTRING_LEN(str);
if (length % 4) /* Not on 4 byte boundary ? */
{
static const char * blanks = " ";
MQLONG pad = 4 - (length % 4);
MQLONG len = length + pad;
PMQBYTE p_data = Message_autogrow_data_buffer(parg, sizeof(len) + length + pad);
memcpy(p_data, (void*) &len, sizeof(len)); /* Start with MQLONG length indicator */
p_data += sizeof(len);
memcpy(p_data, RSTRING_PTR(str), length);
p_data += length;
memcpy(p_data, blanks, pad);
p_data += pad;
*(parg->p_data_offset) += sizeof(len) + length + pad;
}
else
{
PMQBYTE p_data = Message_autogrow_data_buffer(parg, sizeof(length) + length);
memcpy(p_data, (void*) &length, sizeof(length)); /* Start with MQLONG length indicator */
p_data += sizeof(length);
memcpy(p_data, RSTRING_PTR(str), length);
p_data += length;
*(parg->p_data_offset) += sizeof(length) + length;
}
return Qnil;
}
/*
* RFH2 Header can contain multiple XML-like strings
* Message consists of:
* MQRFH2
* xml-string1-length (MQLONG)
* xml-string1 (Padded with spaces to match 4 byte boundary)
* xml-string2-length (MQLONG)
* xml-string2 (Padded with spaces to match 4 byte boundary)
* ....
*
* The input data is either a String or an Array of Strings
* E.g.
*
* {
* :header_type =>:rf_header_2,
* :xml => 'to the world'
* }
* OR
* {
* :header_type =>:rf_header_2,
* :xml => ['to the world', 'xml like string'],
* }
*
*/
void Message_build_rf_header_2(VALUE hash, struct Message_build_header_arg* parg)
{
static MQRFH2 MQRFH2_DEF = {MQRFH2_DEFAULT};
MQLONG rfh2_offset = *(parg->p_data_offset);
PMQBYTE p_data;
VALUE xml = rb_hash_aref(hash, ID2SYM(ID_xml));
if(parg->trace_level>2)
printf ("WMQ::Message#build_rf_header_2 Found rf_header_2\n");
/* Add MQRFH2 Header, ensure that their is enough space */
p_data = Message_autogrow_data_buffer(parg, sizeof(MQRFH2));
memcpy(p_data, &MQRFH2_DEF, sizeof(MQRFH2));
Message_to_mqrfh2(hash, (PMQRFH2)p_data);
if(parg->next_header_id)
{
Message_build_set_format(parg->next_header_id, ((PMQRFH2)p_data)->Format);
}
else
{
memcpy(((PMQRFH2)p_data)->Format, parg->data_format, MQ_FORMAT_LENGTH);
}
*(parg->p_data_offset) += sizeof(MQRFH2);
if (!NIL_P(xml))
{
if(TYPE(xml) == T_ARRAY)
{
rb_iterate (rb_each, xml, Message_build_rf_header_2_each, (VALUE)parg);
}
else if(TYPE(xml) != T_STRING)
{
Message_build_rf_header_2_each(xml, parg);
}
else
{
rb_raise(rb_eArgError, ":name_value supplied in rf_header_2 to WMQ::Message#headers must be a String or an Array");
}
}
/* Set total length in MQRFH2 */
((PMQRFH)(*(parg->pp_buffer) + rfh2_offset))->StrucLength = *(parg->p_data_offset) - rfh2_offset;
if(parg->trace_level>2)
printf ("WMQ::Message#build_rf_header_2 data offset:%ld\n", *(parg->p_data_offset));
}