1 /// Thread-safe atomic stream implementation using `IStream`. 2 module tern.stream.atomic_stream; 3 4 // TODO: Serialization 5 public import tern.stream.impl; 6 import tern.typecons; 7 import tern.serialization; 8 import tern.traits; 9 import tern.object; 10 11 /// Thread-safe implementation of `BinaryStream`. 12 public class AtomicStream : IStream 13 { 14 public: 15 final: 16 shared: 17 Atomic!(ubyte[]) data; 18 Atomic!size_t position; 19 Atomic!Endianness endianness; 20 21 shared this(T)(T data, Endianness endianness = Endianness.Native) 22 { 23 if (isArray!T) 24 this.data = cast(ubyte[])data; 25 else 26 this.data = data.serialize(); 27 this.endianness = endianness; 28 } 29 30 /** 31 * Checks if there are enough elements left in the data array to read `T`. 32 * 33 * Params: 34 * T = The type to check if can be read. 35 * 36 * Returns: 37 * True if there are at least `T.sizeof` bytes left to read from the current position. 38 */ 39 bool mayRead(T)() 40 { 41 return position + T.sizeof > data.length; 42 } 43 44 /** 45 * Checks if there are enough elements left in the data array to read. 46 * 47 * Params: 48 * size = The number of elements to try to read. Defaults to 1. 49 * 50 * Returns: 51 * True if there are at least size elements left to read from the current position. 52 */ 53 bool mayRead(size_t size = 1) 54 { 55 return position + size > data.length; 56 } 57 58 /** 59 * Moves the position in the stream by the size of type T. 60 * 61 * Params: 62 * T = The size of type to move the position by. 63 */ 64 void step(T)(size_t count = 1) 65 { 66 position += T.sizeof * count; 67 } 68 69 /** 70 * Moves the position in the stream forward by one until `val` is peeked. 71 * 72 * Params: 73 * val = The value to be peeked. 74 */ 75 void stepUntil(T)(T val) 76 { 77 static if (isSomeString!T) 78 { 79 while (peekString!(ElementType!T) != val) 80 position++; 81 } 82 else 83 { 84 while (peek!T != val) 85 position++; 86 } 87 } 88 89 /** 90 * Seeks to a new position in the stream based on the provided offset and seek direction. 91 * 92 * Params: 93 * SEEK = The direction of the seek operation (Start, Current, or End). 94 * offset = The offset from the seek direction to be set. 95 */ 96 void seek(Seek SEEK)(size_t offset) 97 { 98 static if (SEEK = Seek.Current) 99 position += offset; 100 else static if (SEEK = Seek.Start) 101 position = offset; 102 else 103 position = data.length - offset; 104 } 105 106 /** 107 * Reads the next value from the stream of type T. 108 * 109 * Params: 110 * T = The type of data to be read. 111 * 112 * Returns: 113 * The value read from the stream. 114 */ 115 T read(T)() 116 if (!isDynamicArray!T) 117 { 118 if (position + T.sizeof > data.length) 119 throw new Throwable("Tried to read past the end of stream!"); 120 121 return (*cast(T*)data[position..(position += T.sizeof)].ptr).makeEndian(endianness); 122 } 123 124 /** 125 * Peeks at the next value from the stream of type T without advancing the stream position. 126 * 127 * Params: 128 * T = The type of data to peek. 129 * 130 * Returns: 131 * The value peeked from the stream. 132 */ 133 T peek(T)() 134 if (!isDynamicArray!T) 135 { 136 if (position + T.sizeof > data.length) 137 throw new Throwable("Tried to read past the end of stream!"); 138 139 return (*cast(T*)data[position..(position + T.sizeof)].ptr).makeEndian(endianness); 140 } 141 142 /** 143 * Reads multiple values of type T from the stream. 144 * 145 * Params: 146 * T = The type of data to be read. 147 * count = The number of values to read from the stream. 148 * 149 * Returns: 150 * An array of values read from the stream. 151 */ 152 T[] read(T)(size_t count) 153 if (!isDynamicArray!T) 154 { 155 T[] arr; 156 foreach (i; 0..count) 157 arr ~= read!T; 158 return arr; 159 } 160 161 /** 162 * Peeks at multiple values of type T from the stream without advancing the stream position. 163 * 164 * Params: 165 * T = The type of data to peek. 166 * count = The number of values to peek from the stream. 167 * 168 * Returns: 169 * An array of values peeked from the stream. 170 */ 171 T[] peek(T)(size_t count) 172 if (!isDynamicArray!T) 173 { 174 auto _position = position; 175 scope (exit) position = _position; 176 T[] arr; 177 foreach (i; 0..count) 178 arr ~= read!T; 179 return arr; 180 } 181 182 /** 183 * Reads an array of type T from the stream. 184 * 185 * Params: 186 * T = The type of data to be read. 187 * 188 * Returns: 189 * An array read from the stream. 190 */ 191 T read(T : U[], U)() 192 if (isDynamicArray!T) 193 { 194 return read!(ElementType!T)(cast(size_t)read7EncodedInt()); 195 } 196 197 /** 198 * Peeks an array of type T from the stream without advancing the stream position. 199 * 200 * Params: 201 * T = The type of data to peek. 202 * 203 * Returns: 204 * An array peeked from the stream. 205 */ 206 T peek(T : U[], U)() 207 if (isDynamicArray!T) 208 { 209 return peek!(ElementType!T)(cast(size_t)read7EncodedInt()); 210 } 211 212 /** 213 * Writes the provided value to the stream. 214 * 215 * Params: 216 * T = The type of data to be written. 217 * val = The value to be written to the stream. 218 */ 219 void write(T)(T val) 220 { 221 if (position + T.sizeof > data.length) 222 throw new Throwable("Tried to write past the end of stream!"); 223 224 auto _val = val.makeEndian(endianness); 225 data[position..(position += T.sizeof)] = (cast(ubyte*)&_val)[0..T.sizeof]; 226 } 227 228 /** 229 * Writes the provided value to the stream without advancing the stream position. 230 * 231 * Params: 232 * T = The type of data to be written. 233 * val = The value to be written to the stream. 234 */ 235 void put(T)(T val) 236 { 237 if (position + T.sizeof > data.length) 238 throw new Throwable("Tried to write past the end of stream!"); 239 240 auto _val = val.makeEndian(endianness); 241 data[position..(position + T.sizeof)] = (cast(ubyte*)&_val)[0..T.sizeof]; 242 } 243 244 /** 245 * Writes multiple values of type T to the stream. 246 * 247 * Params: 248 * T = The type of data to be written. 249 * items = An array of values to be written to the stream. 250 */ 251 void write(T, bool PREFIXED = true)(T val) 252 if (isArray!T) 253 { 254 static if (PREFIXED) 255 write7EncodedInt(cast(uint)val.length); 256 257 foreach (u; val) 258 write(u); 259 } 260 261 /** 262 * Writes multiple values of type T to the stream without advancing the stream position. 263 * 264 * Params: 265 * T = The type of data to be written. 266 * items = An array of values to be written to the stream. 267 */ 268 void put(T, bool PREFIXED = true)(T val) 269 if (isArray!T) 270 { 271 auto _position = position; 272 scope (exit) position = _position; 273 static if (PREFIXED) 274 write7EncodedInt(cast(uint)val.length); 275 276 foreach (u; val) 277 write(u); 278 } 279 280 /** 281 * Reads a string from the stream considering the character width and prefixing. 282 * 283 * Params: 284 * CHAR = The character type used for reading the string (char, wchar, or dchar). 285 * PREFIXED = Indicates whether the string is prefixed. Default is false. 286 * 287 * Returns: 288 * The read string from the stream. 289 */ 290 immutable(CHAR)[] readString(CHAR, bool PREFIXED = false)() 291 { 292 static if (PREFIXED) 293 return cast(immutable(CHAR)[])read!(immutable(CHAR)[]); 294 else 295 { 296 immutable(CHAR)[] ret; 297 while (peek!CHAR != '\0') 298 ret ~= read!CHAR; 299 return ret; 300 } 301 } 302 303 /** 304 * Reads a string from the stream considering the character width and prefixing without advancing the stream position. 305 * 306 * Params: 307 * CHAR = The character type used for reading the string (char, wchar, or dchar). 308 * PREFIXED = Indicates whether the string is prefixed. Default is false. 309 * 310 * Returns: 311 * The read string from the stream. 312 */ 313 immutable(CHAR)[] peekString(CHAR, bool PREFIXED = false)() 314 { 315 auto _position = position; 316 scope (exit) position = _position; 317 static if (PREFIXED) 318 return cast(immutable(CHAR)[])read!(immutable(CHAR)[]); 319 else 320 { 321 immutable(CHAR)[] ret; 322 while (peek!CHAR != '\0') 323 ret ~= read!CHAR; 324 return ret; 325 } 326 } 327 328 /** 329 * Writes a string to the stream considering the character width and prefixing. 330 * 331 * Params: 332 * CHAR = The character type used for writing the string (char, wchar, or dchar). 333 * PREFIXED = Indicates whether the string is prefixed. Default is false. 334 * val = The string to be written to the stream. 335 */ 336 void writeString(CHAR, bool PREFIXED = false)(immutable(CHAR)[] val) 337 { 338 static if (!PREFIXED) 339 val ~= '\0'; 340 341 write!(immutable(CHAR)[], PREFIXED)(val); 342 } 343 344 /** 345 * Writes a string into the stream considering the character width and prefixing without advancing the stream position. 346 * 347 * Params: 348 * CHAR = The character type used for writing the string (char, wchar, or dchar). 349 * PREFIXED = Indicates whether the string is prefixed. Default is false. 350 * val = The string to be put into the stream. 351 */ 352 void putString(CHAR, bool PREFIXED = false)(immutable(CHAR)[] val) 353 { 354 static if (!PREFIXED) 355 val ~= '\0'; 356 357 put!(immutable(CHAR)[], PREFIXED)(val); 358 } 359 360 /** 361 * Reads an integer value encoded in 7 bits from the stream. 362 * 363 * Returns: 364 * The integer value read from the stream. 365 */ 366 uint read7EncodedInt() 367 { 368 uint result = 0; 369 uint shift = 0; 370 371 foreach (i; 0..5) 372 { 373 ubyte b = read!ubyte; 374 result |= cast(uint)(b & 0x7F) << shift; 375 if ((b & 0x80) == 0) 376 return result; 377 shift += 7; 378 } 379 380 return result; 381 } 382 383 /** 384 * Writes an integer value encoded in 7 bits to the stream. 385 * 386 * Params: 387 * val = The integer value to be written to the stream. 388 */ 389 void write7EncodedInt(uint val) 390 { 391 foreach (i; 0..5) 392 { 393 byte b = cast(byte)(val & 0x7F); 394 val >>= 7; 395 if (val != 0) 396 b |= 0x80; 397 write(b); 398 if (val == 0) 399 return; 400 } 401 } 402 403 /** 404 * Reads data from a byte stream into a structured type based on specified field names and read kinds. 405 * Designed specifically for better control reading string and array fields. 406 * 407 * Params: 408 * T = The type representing the structure to read into. 409 * ARGS = Variadic template parameter representing field names and read kinds. 410 * 411 * Returns: 412 * Returns an instance of type T with fields populated based on the specified read operations. 413 */ 414 T read(T, ARGS...)() 415 { 416 T val; 417 foreach (field; Fields!T) 418 { 419 alias M = TypeOf!(val, field); 420 bool cread; 421 static foreach (i, ARG; ARGS) 422 { 423 static if (i % 3 == 0) 424 { 425 static assert(is(typeof(ARG) == string), 426 "Field name expected, found " ~ ARG.stringof); 427 } 428 else static if (i % 3 == 1) 429 { 430 static assert(is(typeof(ARG) == ReadKind), 431 "Read kind expected, found " ~ ARG.stringof); 432 } 433 else 434 { 435 static if (field == ARGS[i - 2] || ARGS[i - 2] == "") 436 { 437 static if (!isStaticArray!M && is(M == string)) 438 { 439 cread = true; 440 static if (ARGS[i - 1] == ReadKind.Field) 441 { 442 __traits(getMember, val, field) = read!char(__traits(getMember, val, ARG)).to!string; 443 } 444 else static if (ARGS[i - 1] == ReadKind.Fixed) 445 { 446 __traits(getMember, val, field) = read!char(ARG).to!string; 447 } 448 else 449 { 450 __traits(getMember, val, field) = readString!(char, ARG); 451 } 452 } 453 else static if (!isStaticArray!M && is(M == wstring)) 454 { 455 cread = true; 456 static if (ARGS[i - 1] == ReadKind.Field) 457 { 458 __traits(getMember, val, field) = read!wchar(__traits(getMember, val, ARG)).to!string; 459 } 460 else static if (ARGS[i - 1] == ReadKind.Fixed) 461 { 462 __traits(getMember, val, field) = read!wchar(ARG).to!string; 463 } 464 else 465 { 466 __traits(getMember, val, field) = readString!(wchar, ARG); 467 } 468 } 469 static if (!isStaticArray!M && is(M == dstring)) 470 { 471 cread = true; 472 static if (ARGS[i - 1] == ReadKind.Field) 473 { 474 __traits(getMember, val, field) = read!dchar(__traits(getMember, val, ARG)).to!string; 475 } 476 else static if (ARGS[i - 1] == ReadKind.Fixed) 477 { 478 __traits(getMember, val, field) = read!dchar(ARG).to!string; 479 } 480 else 481 { 482 __traits(getMember, val, field) = readString!(dchar, ARG); 483 } 484 } 485 else static if (isDynamicArray!M) 486 { 487 cread = true; 488 static if (ARGS[i - 1] == ReadKind.Field) 489 { 490 __traits(getMember, val, field) = read!(ElementType!M)(__traits(getMember, val, ARG)); 491 } 492 else static if (ARGS[i - 1] == ReadKind.Fixed) 493 { 494 __traits(getMember, val, field) = read!(ElementType!M)(ARG); 495 } 496 else 497 { 498 __traits(getMember, val, field) = read!M; 499 } 500 } 501 } 502 } 503 } 504 if (!cread) 505 __traits(getMember, val, field) = read!M; 506 } 507 return val; 508 } 509 510 /// ditto 511 T[] read(T, ARGS...)(size_t count) 512 { 513 T[] items; 514 foreach (i; 0..count) 515 items ~= read!(T, ARGS); 516 return items; 517 } 518 519 /** 520 * Reads a type from the stream using optional fields. 521 * 522 * Params: 523 * T = The type to be read from the stream. 524 * ARGS... = The arguments for optional fields. 525 * 526 * Returns: 527 * The read type read from the stream. 528 */ 529 T readPlasticized(T, ARGS...)() 530 if (ARGS.length % 3 == 0) 531 { 532 T val; 533 foreach (field; Fields!T) 534 { 535 bool cread = true; 536 static foreach (i, ARG; ARGS) 537 { 538 static if (i % 3 == 0) 539 { 540 static assert(is(typeof(ARG) == string), 541 "Field name expected, found " ~ ARG.stringof); 542 } 543 else static if (i % 3 == 1) 544 { 545 static assert(is(typeof(ARG) == string), 546 "Conditional field name expected, found " ~ ARG.stringof); 547 } 548 else 549 { 550 if (field == ARGS[i - 2] && __traits(getMember, val, ARGS[i - 1]) != ARG) 551 cread = false; 552 } 553 } 554 if (cread) 555 __traits(getMember, val, field) = read!(TypeOf!(val, field)); 556 } 557 return val; 558 } 559 }