001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.openwire; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.lang.reflect.Method; 023import java.util.HashMap; 024import java.util.Map; 025 026import org.apache.activemq.command.CommandTypes; 027import org.apache.activemq.command.DataStructure; 028import org.apache.activemq.command.WireFormatInfo; 029import org.apache.activemq.util.ByteSequence; 030import org.apache.activemq.util.ByteSequenceData; 031import org.apache.activemq.util.DataByteArrayInputStream; 032import org.apache.activemq.util.DataByteArrayOutputStream; 033import org.apache.activemq.util.IOExceptionSupport; 034import org.apache.activemq.wireformat.WireFormat; 035 036/** 037 * 038 * 039 */ 040public final class OpenWireFormat implements WireFormat { 041 042 public static final int DEFAULT_STORE_VERSION = CommandTypes.PROTOCOL_STORE_VERSION; 043 public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION; 044 public static final int DEFAULT_LEGACY_VERSION = CommandTypes.PROTOCOL_LEGACY_STORE_VERSION; 045 public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE; 046 047 static final byte NULL_TYPE = CommandTypes.NULL; 048 private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2; 049 private static final int MARSHAL_CACHE_FREE_SPACE = 100; 050 051 private DataStreamMarshaller dataMarshallers[]; 052 private int version; 053 private boolean stackTraceEnabled; 054 private boolean tcpNoDelayEnabled; 055 private boolean cacheEnabled; 056 private boolean tightEncodingEnabled; 057 private boolean sizePrefixDisabled; 058 private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; 059 060 // The following fields are used for value caching 061 private short nextMarshallCacheIndex; 062 private short nextMarshallCacheEvictionIndex; 063 private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>(); 064 private DataStructure marshallCache[] = null; 065 private DataStructure unmarshallCache[] = null; 066 private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream(); 067 private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); 068 private WireFormatInfo preferedWireFormatInfo; 069 070 public OpenWireFormat() { 071 this(DEFAULT_STORE_VERSION); 072 } 073 074 public OpenWireFormat(int i) { 075 setVersion(i); 076 } 077 078 @Override 079 public int hashCode() { 080 return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) 081 ^ (stackTraceEnabled ? 0x01000000 : 0x02000000) 082 ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000) 083 ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000); 084 } 085 086 public OpenWireFormat copy() { 087 OpenWireFormat answer = new OpenWireFormat(version); 088 answer.stackTraceEnabled = stackTraceEnabled; 089 answer.tcpNoDelayEnabled = tcpNoDelayEnabled; 090 answer.cacheEnabled = cacheEnabled; 091 answer.tightEncodingEnabled = tightEncodingEnabled; 092 answer.sizePrefixDisabled = sizePrefixDisabled; 093 answer.preferedWireFormatInfo = preferedWireFormatInfo; 094 return answer; 095 } 096 097 @Override 098 public boolean equals(Object object) { 099 if (object == null) { 100 return false; 101 } 102 OpenWireFormat o = (OpenWireFormat)object; 103 return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled 104 && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled 105 && o.sizePrefixDisabled == sizePrefixDisabled; 106 } 107 108 109 @Override 110 public String toString() { 111 return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" 112 + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}"; 113 // return "OpenWireFormat{id="+id+", 114 // tightEncodingEnabled="+tightEncodingEnabled+"}"; 115 } 116 117 @Override 118 public int getVersion() { 119 return version; 120 } 121 122 @Override 123 public synchronized ByteSequence marshal(Object command) throws IOException { 124 125 if (cacheEnabled) { 126 runMarshallCacheEvictionSweep(); 127 } 128 129 ByteSequence sequence = null; 130 int size = 1; 131 if (command != null) { 132 133 DataStructure c = (DataStructure)command; 134 byte type = c.getDataStructureType(); 135 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 136 if (dsm == null) { 137 throw new IOException("Unknown data type: " + type); 138 } 139 if (tightEncodingEnabled) { 140 141 BooleanStream bs = new BooleanStream(); 142 size += dsm.tightMarshal1(this, c, bs); 143 size += bs.marshalledSize(); 144 145 bytesOut.restart(size); 146 if (!sizePrefixDisabled) { 147 bytesOut.writeInt(size); 148 } 149 bytesOut.writeByte(type); 150 bs.marshal(bytesOut); 151 dsm.tightMarshal2(this, c, bytesOut, bs); 152 sequence = bytesOut.toByteSequence(); 153 154 } else { 155 bytesOut.restart(); 156 if (!sizePrefixDisabled) { 157 bytesOut.writeInt(0); // we don't know the final size 158 // yet but write this here for 159 // now. 160 } 161 bytesOut.writeByte(type); 162 dsm.looseMarshal(this, c, bytesOut); 163 sequence = bytesOut.toByteSequence(); 164 165 if (!sizePrefixDisabled) { 166 size = sequence.getLength() - 4; 167 int pos = sequence.offset; 168 ByteSequenceData.writeIntBig(sequence, size); 169 sequence.offset = pos; 170 } 171 } 172 173 } else { 174 bytesOut.restart(5); 175 bytesOut.writeInt(size); 176 bytesOut.writeByte(NULL_TYPE); 177 sequence = bytesOut.toByteSequence(); 178 } 179 180 return sequence; 181 } 182 183 @Override 184 public synchronized Object unmarshal(ByteSequence sequence) throws IOException { 185 bytesIn.restart(sequence); 186 // DataInputStream dis = new DataInputStream(new 187 // ByteArrayInputStream(sequence)); 188 189 if (!sizePrefixDisabled) { 190 int size = bytesIn.readInt(); 191 if (sequence.getLength() - 4 != size) { 192 // throw new IOException("Packet size does not match marshaled 193 // size"); 194 } 195 196 if (size > maxFrameSize) { 197 throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize); 198 } 199 } 200 201 Object command = doUnmarshal(bytesIn); 202 // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) { 203 // ((MarshallAware) command).setCachedMarshalledForm(this, sequence); 204 // } 205 return command; 206 } 207 208 @Override 209 public synchronized void marshal(Object o, DataOutput dataOut) throws IOException { 210 211 if (cacheEnabled) { 212 runMarshallCacheEvictionSweep(); 213 } 214 215 int size = 1; 216 if (o != null) { 217 218 DataStructure c = (DataStructure)o; 219 byte type = c.getDataStructureType(); 220 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 221 if (dsm == null) { 222 throw new IOException("Unknown data type: " + type); 223 } 224 if (tightEncodingEnabled) { 225 BooleanStream bs = new BooleanStream(); 226 size += dsm.tightMarshal1(this, c, bs); 227 size += bs.marshalledSize(); 228 229 if (!sizePrefixDisabled) { 230 dataOut.writeInt(size); 231 } 232 233 dataOut.writeByte(type); 234 bs.marshal(dataOut); 235 dsm.tightMarshal2(this, c, dataOut, bs); 236 237 } else { 238 DataOutput looseOut = dataOut; 239 240 if (!sizePrefixDisabled) { 241 bytesOut.restart(); 242 looseOut = bytesOut; 243 } 244 245 looseOut.writeByte(type); 246 dsm.looseMarshal(this, c, looseOut); 247 248 if (!sizePrefixDisabled) { 249 ByteSequence sequence = bytesOut.toByteSequence(); 250 dataOut.writeInt(sequence.getLength()); 251 dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); 252 } 253 254 } 255 256 } else { 257 if (!sizePrefixDisabled) { 258 dataOut.writeInt(size); 259 } 260 dataOut.writeByte(NULL_TYPE); 261 } 262 } 263 264 @Override 265 public Object unmarshal(DataInput dis) throws IOException { 266 DataInput dataIn = dis; 267 if (!sizePrefixDisabled) { 268 int size = dis.readInt(); 269 if (size > maxFrameSize) { 270 throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize); 271 } 272 // int size = dis.readInt(); 273 // byte[] data = new byte[size]; 274 // dis.readFully(data); 275 // bytesIn.restart(data); 276 // dataIn = bytesIn; 277 } 278 return doUnmarshal(dataIn); 279 } 280 281 /** 282 * Used by NIO or AIO transports 283 */ 284 public int tightMarshal1(Object o, BooleanStream bs) throws IOException { 285 int size = 1; 286 if (o != null) { 287 DataStructure c = (DataStructure)o; 288 byte type = c.getDataStructureType(); 289 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 290 if (dsm == null) { 291 throw new IOException("Unknown data type: " + type); 292 } 293 294 size += dsm.tightMarshal1(this, c, bs); 295 size += bs.marshalledSize(); 296 } 297 return size; 298 } 299 300 /** 301 * Used by NIO or AIO transports; note that the size is not written as part 302 * of this method. 303 */ 304 public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException { 305 if (cacheEnabled) { 306 runMarshallCacheEvictionSweep(); 307 } 308 309 if (o != null) { 310 DataStructure c = (DataStructure)o; 311 byte type = c.getDataStructureType(); 312 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 313 if (dsm == null) { 314 throw new IOException("Unknown data type: " + type); 315 } 316 ds.writeByte(type); 317 bs.marshal(ds); 318 dsm.tightMarshal2(this, c, ds, bs); 319 } 320 } 321 322 /** 323 * Allows you to dynamically switch the version of the openwire protocol 324 * being used. 325 * 326 * @param version 327 */ 328 @Override 329 public void setVersion(int version) { 330 String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory"; 331 Class mfClass; 332 try { 333 mfClass = Class.forName(mfName, false, getClass().getClassLoader()); 334 } catch (ClassNotFoundException e) { 335 throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version 336 + ", could not load " + mfName) 337 .initCause(e); 338 } 339 try { 340 Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class}); 341 dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this}); 342 } catch (Throwable e) { 343 throw (IllegalArgumentException)new IllegalArgumentException( 344 "Invalid version: " 345 + version 346 + ", " 347 + mfName 348 + " does not properly implement the createMarshallerMap method.") 349 .initCause(e); 350 } 351 this.version = version; 352 } 353 354 public Object doUnmarshal(DataInput dis) throws IOException { 355 byte dataType = dis.readByte(); 356 if (dataType != NULL_TYPE) { 357 DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF]; 358 if (dsm == null) { 359 throw new IOException("Unknown data type: " + dataType); 360 } 361 Object data = dsm.createObject(); 362 if (this.tightEncodingEnabled) { 363 BooleanStream bs = new BooleanStream(); 364 bs.unmarshal(dis); 365 dsm.tightUnmarshal(this, data, dis, bs); 366 } else { 367 dsm.looseUnmarshal(this, data, dis); 368 } 369 return data; 370 } else { 371 return null; 372 } 373 } 374 375 // public void debug(String msg) { 376 // String t = (Thread.currentThread().getName()+" ").substring(0, 40); 377 // System.out.println(t+": "+msg); 378 // } 379 public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException { 380 bs.writeBoolean(o != null); 381 if (o == null) { 382 return 0; 383 } 384 385 if (o.isMarshallAware()) { 386 // MarshallAware ma = (MarshallAware)o; 387 ByteSequence sequence = null; 388 // sequence=ma.getCachedMarshalledForm(this); 389 bs.writeBoolean(sequence != null); 390 if (sequence != null) { 391 return 1 + sequence.getLength(); 392 } 393 } 394 395 byte type = o.getDataStructureType(); 396 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 397 if (dsm == null) { 398 throw new IOException("Unknown data type: " + type); 399 } 400 return 1 + dsm.tightMarshal1(this, o, bs); 401 } 402 403 public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs) 404 throws IOException { 405 if (!bs.readBoolean()) { 406 return; 407 } 408 409 byte type = o.getDataStructureType(); 410 ds.writeByte(type); 411 412 if (o.isMarshallAware() && bs.readBoolean()) { 413 414 // We should not be doing any caching 415 throw new IOException("Corrupted stream"); 416 // MarshallAware ma = (MarshallAware) o; 417 // ByteSequence sequence=ma.getCachedMarshalledForm(this); 418 // ds.write(sequence.getData(), sequence.getOffset(), 419 // sequence.getLength()); 420 421 } else { 422 423 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 424 if (dsm == null) { 425 throw new IOException("Unknown data type: " + type); 426 } 427 dsm.tightMarshal2(this, o, ds, bs); 428 429 } 430 } 431 432 public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException { 433 if (bs.readBoolean()) { 434 435 byte dataType = dis.readByte(); 436 DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF]; 437 if (dsm == null) { 438 throw new IOException("Unknown data type: " + dataType); 439 } 440 DataStructure data = dsm.createObject(); 441 442 if (data.isMarshallAware() && bs.readBoolean()) { 443 444 dis.readInt(); 445 dis.readByte(); 446 447 BooleanStream bs2 = new BooleanStream(); 448 bs2.unmarshal(dis); 449 dsm.tightUnmarshal(this, data, dis, bs2); 450 451 // TODO: extract the sequence from the dis and associate it. 452 // MarshallAware ma = (MarshallAware)data 453 // ma.setCachedMarshalledForm(this, sequence); 454 455 } else { 456 dsm.tightUnmarshal(this, data, dis, bs); 457 } 458 459 return data; 460 } else { 461 return null; 462 } 463 } 464 465 public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException { 466 if (dis.readBoolean()) { 467 468 byte dataType = dis.readByte(); 469 DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF]; 470 if (dsm == null) { 471 throw new IOException("Unknown data type: " + dataType); 472 } 473 DataStructure data = dsm.createObject(); 474 dsm.looseUnmarshal(this, data, dis); 475 return data; 476 477 } else { 478 return null; 479 } 480 } 481 482 public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException { 483 dataOut.writeBoolean(o != null); 484 if (o != null) { 485 byte type = o.getDataStructureType(); 486 dataOut.writeByte(type); 487 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 488 if (dsm == null) { 489 throw new IOException("Unknown data type: " + type); 490 } 491 dsm.looseMarshal(this, o, dataOut); 492 } 493 } 494 495 public void runMarshallCacheEvictionSweep() { 496 // Do we need to start evicting?? 497 while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) { 498 499 marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]); 500 marshallCache[nextMarshallCacheEvictionIndex] = null; 501 502 nextMarshallCacheEvictionIndex++; 503 if (nextMarshallCacheEvictionIndex >= marshallCache.length) { 504 nextMarshallCacheEvictionIndex = 0; 505 } 506 507 } 508 } 509 510 public Short getMarshallCacheIndex(DataStructure o) { 511 return marshallCacheMap.get(o); 512 } 513 514 public Short addToMarshallCache(DataStructure o) { 515 short i = nextMarshallCacheIndex++; 516 if (nextMarshallCacheIndex >= marshallCache.length) { 517 nextMarshallCacheIndex = 0; 518 } 519 520 // We can only cache that item if there is space left. 521 if (marshallCacheMap.size() < marshallCache.length) { 522 marshallCache[i] = o; 523 Short index = new Short(i); 524 marshallCacheMap.put(o, index); 525 return index; 526 } else { 527 // Use -1 to indicate that the value was not cached due to cache 528 // being full. 529 return new Short((short)-1); 530 } 531 } 532 533 public void setInUnmarshallCache(short index, DataStructure o) { 534 535 // There was no space left in the cache, so we can't 536 // put this in the cache. 537 if (index == -1) { 538 return; 539 } 540 541 unmarshallCache[index] = o; 542 } 543 544 public DataStructure getFromUnmarshallCache(short index) { 545 return unmarshallCache[index]; 546 } 547 548 public void setStackTraceEnabled(boolean b) { 549 stackTraceEnabled = b; 550 } 551 552 public boolean isStackTraceEnabled() { 553 return stackTraceEnabled; 554 } 555 556 public boolean isTcpNoDelayEnabled() { 557 return tcpNoDelayEnabled; 558 } 559 560 public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) { 561 this.tcpNoDelayEnabled = tcpNoDelayEnabled; 562 } 563 564 public boolean isCacheEnabled() { 565 return cacheEnabled; 566 } 567 568 public void setCacheEnabled(boolean cacheEnabled) { 569 if(cacheEnabled){ 570 marshallCache = new DataStructure[MARSHAL_CACHE_SIZE]; 571 unmarshallCache = new DataStructure[MARSHAL_CACHE_SIZE]; 572 } 573 this.cacheEnabled = cacheEnabled; 574 } 575 576 public boolean isTightEncodingEnabled() { 577 return tightEncodingEnabled; 578 } 579 580 public void setTightEncodingEnabled(boolean tightEncodingEnabled) { 581 this.tightEncodingEnabled = tightEncodingEnabled; 582 } 583 584 public boolean isSizePrefixDisabled() { 585 return sizePrefixDisabled; 586 } 587 588 public void setSizePrefixDisabled(boolean prefixPacketSize) { 589 this.sizePrefixDisabled = prefixPacketSize; 590 } 591 592 public void setPreferedWireFormatInfo(WireFormatInfo info) { 593 this.preferedWireFormatInfo = info; 594 } 595 596 public WireFormatInfo getPreferedWireFormatInfo() { 597 return preferedWireFormatInfo; 598 } 599 600 public long getMaxFrameSize() { 601 return maxFrameSize; 602 } 603 604 public void setMaxFrameSize(long maxFrameSize) { 605 this.maxFrameSize = maxFrameSize; 606 } 607 608 public void renegotiateWireFormat(WireFormatInfo info) throws IOException { 609 610 if (preferedWireFormatInfo == null) { 611 throw new IllegalStateException("Wireformat cannot not be renegotiated."); 612 } 613 614 this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion())); 615 info.setVersion(this.getVersion()); 616 617 this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize())); 618 info.setMaxFrameSize(this.getMaxFrameSize()); 619 620 this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); 621 info.setStackTraceEnabled(this.stackTraceEnabled); 622 623 this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled(); 624 info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled); 625 626 this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled(); 627 info.setCacheEnabled(this.cacheEnabled); 628 629 this.tightEncodingEnabled = info.isTightEncodingEnabled() 630 && preferedWireFormatInfo.isTightEncodingEnabled(); 631 info.setTightEncodingEnabled(this.tightEncodingEnabled); 632 633 this.sizePrefixDisabled = info.isSizePrefixDisabled() 634 && preferedWireFormatInfo.isSizePrefixDisabled(); 635 info.setSizePrefixDisabled(this.sizePrefixDisabled); 636 637 if (cacheEnabled) { 638 639 int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize()); 640 info.setCacheSize(size); 641 642 if (size == 0) { 643 size = MARSHAL_CACHE_SIZE; 644 } 645 646 marshallCache = new DataStructure[size]; 647 unmarshallCache = new DataStructure[size]; 648 nextMarshallCacheIndex = 0; 649 nextMarshallCacheEvictionIndex = 0; 650 marshallCacheMap = new HashMap<DataStructure, Short>(); 651 } else { 652 marshallCache = null; 653 unmarshallCache = null; 654 nextMarshallCacheIndex = 0; 655 nextMarshallCacheEvictionIndex = 0; 656 marshallCacheMap = null; 657 } 658 659 } 660 661 protected int min(int version1, int version2) { 662 if (version1 < version2 && version1 > 0 || version2 <= 0) { 663 return version1; 664 } 665 return version2; 666 } 667 668 protected long min(long version1, long version2) { 669 if (version1 < version2 && version1 > 0 || version2 <= 0) { 670 return version1; 671 } 672 return version2; 673 } 674}