1 /// Thread-safe atomic stream implementation using `IStream`.
2 module tern.stream.atomic_stream;
3 
4 // TODO: Serialization
5 public import tern.stream.impl;
6 import tern.typecons;
7 import tern.serialization;
8 import tern.traits;
9 import tern.object;
10 
11 /// Thread-safe implementation of `BinaryStream`.
12 public class AtomicStream : IStream
13 {
14 public:
15 final:
16 shared:
17     Atomic!(ubyte[]) data;
18     Atomic!size_t position;
19     Atomic!Endianness endianness;
20 
21     shared this(T)(T data, Endianness endianness = Endianness.Native)
22     {
23         if (isArray!T)
24             this.data = cast(ubyte[])data;
25         else
26             this.data = data.serialize();
27         this.endianness = endianness;
28     }
29 
30     /**
31      * Checks if there are enough elements left in the data array to read `T`.
32      *
33      * Params:
34      *  T = The type to check if can be read.
35      * 
36      * Returns:
37      *  True if there are at least `T.sizeof` bytes left to read from the current position. 
38      */
39     bool mayRead(T)()
40     {
41         return position + T.sizeof > data.length;
42     }
43 
44     /**
45      * Checks if there are enough elements left in the data array to read.
46      * 
47      * Params:
48      *  size = The number of elements to try to read. Defaults to 1.
49      *
50      * Returns:
51      *  True if there are at least size elements left to read from the current position. 
52      */
53     bool mayRead(size_t size = 1)
54     {
55         return position + size > data.length;
56     }
57 
58     /**
59      * Moves the position in the stream by the size of type T.
60      *
61      * Params:
62      *   T = The size of type to move the position by.
63      */
64     void step(T)(size_t count = 1)
65     {
66         position += T.sizeof * count;
67     }
68 
69     /** 
70      * Moves the position in the stream forward by one until `val` is peeked.
71      *
72      * Params:
73      *  val = The value to be peeked.
74      */
75     void stepUntil(T)(T val)
76     {
77         static if (isSomeString!T)
78         {
79             while (peekString!(ElementType!T) != val)
80                 position++;
81         }
82         else
83         {
84             while (peek!T != val)
85                 position++;
86         }
87     }
88 
89     /**
90      * Seeks to a new position in the stream based on the provided offset and seek direction.
91      *
92      * Params:
93      *  SEEK = The direction of the seek operation (Start, Current, or End).
94      *  offset = The offset from the seek direction to be set.
95      */
96     void seek(Seek SEEK)(size_t offset)
97     {
98         static if (SEEK = Seek.Current)
99             position += offset;
100         else static if (SEEK = Seek.Start)
101             position = offset;
102         else
103             position = data.length - offset;
104     }
105 
106     /**
107      * Reads the next value from the stream of type T.
108      *
109      * Params:
110      *   T = The type of data to be read.
111      *
112      * Returns:
113      *  The value read from the stream.
114      */
115     T read(T)()
116         if (!isDynamicArray!T)
117     {
118         if (position + T.sizeof > data.length)
119             throw new Throwable("Tried to read past the end of stream!");
120 
121         return (*cast(T*)data[position..(position += T.sizeof)].ptr).makeEndian(endianness);
122     }
123 
124     /**
125      * Peeks at the next value from the stream of type T without advancing the stream position.
126      *
127      * Params:
128      *  T = The type of data to peek.
129      *
130      * Returns:
131      *  The value peeked from the stream.
132      */
133     T peek(T)()
134         if (!isDynamicArray!T)
135     {
136         if (position + T.sizeof > data.length)
137             throw new Throwable("Tried to read past the end of stream!");
138 
139         return (*cast(T*)data[position..(position + T.sizeof)].ptr).makeEndian(endianness);
140     }
141 
142     /**
143      * Reads multiple values of type T from the stream.
144      *
145      * Params:
146      *  T = The type of data to be read.
147      *  count = The number of values to read from the stream.
148      *
149      * Returns:
150      *  An array of values read from the stream.
151      */
152     T[] read(T)(size_t count)
153         if (!isDynamicArray!T)
154     {
155         T[] arr;
156         foreach (i; 0..count)
157             arr ~= read!T;
158         return arr;
159     }
160 
161     /**
162      * Peeks at multiple values of type T from the stream without advancing the stream position.
163      *
164      * Params:
165      *  T = The type of data to peek.
166      *  count = The number of values to peek from the stream.
167      *
168      * Returns:
169      *  An array of values peeked from the stream.
170      */
171     T[] peek(T)(size_t count)
172         if (!isDynamicArray!T)
173     {
174         auto _position = position;
175         scope (exit) position = _position;
176         T[] arr;
177         foreach (i; 0..count)
178             arr ~= read!T;
179         return arr;
180     }
181 
182     /**
183      * Reads an array of type T from the stream.
184      *
185      * Params:
186      *  T = The type of data to be read.
187      *
188      * Returns:
189      *  An array read from the stream.
190      */
191     T read(T : U[], U)()
192         if (isDynamicArray!T)
193     {
194         return read!(ElementType!T)(cast(size_t)read7EncodedInt());
195     }
196 
197     /**
198      * Peeks an array of type T from the stream without advancing the stream position.
199      *
200      * Params:
201      *  T = The type of data to peek.
202      *
203      * Returns:
204      *  An array peeked from the stream.
205      */
206     T peek(T : U[], U)()
207         if (isDynamicArray!T)
208     {
209         return peek!(ElementType!T)(cast(size_t)read7EncodedInt());
210     }
211 
212     /**
213      * Writes the provided value to the stream.
214      *
215      * Params:
216      *   T = The type of data to be written.
217      *   val = The value to be written to the stream.
218      */
219     void write(T)(T val)
220     {        
221         if (position + T.sizeof > data.length)
222             throw new Throwable("Tried to write past the end of stream!");
223 
224         auto _val = val.makeEndian(endianness);
225         data[position..(position += T.sizeof)] = (cast(ubyte*)&_val)[0..T.sizeof];
226     }
227 
228     /**
229      * Writes the provided value to the stream without advancing the stream position.
230      *
231      * Params:
232      *   T = The type of data to be written.
233      *   val = The value to be written to the stream.
234      */
235     void put(T)(T val)
236     {
237         if (position + T.sizeof > data.length)
238             throw new Throwable("Tried to write past the end of stream!");
239 
240         auto _val = val.makeEndian(endianness);
241         data[position..(position + T.sizeof)] = (cast(ubyte*)&_val)[0..T.sizeof];
242     }
243 
244     /**
245      * Writes multiple values of type T to the stream.
246      *
247      * Params:
248      *   T = The type of data to be written.
249      *   items = An array of values to be written to the stream.
250      */
251     void write(T, bool PREFIXED = true)(T val)
252         if (isArray!T)
253     {
254         static if (PREFIXED)
255             write7EncodedInt(cast(uint)val.length);
256 
257         foreach (u; val)
258             write(u);
259     }
260 
261     /**
262      * Writes multiple values of type T to the stream without advancing the stream position.
263      *
264      * Params:
265      *   T = The type of data to be written.
266      *   items = An array of values to be written to the stream.
267      */
268     void put(T, bool PREFIXED = true)(T val)
269         if (isArray!T)
270     {
271         auto _position = position;
272         scope (exit) position = _position;
273         static if (PREFIXED)
274             write7EncodedInt(cast(uint)val.length);
275 
276         foreach (u; val)
277             write(u);
278     }
279 
280     /**
281      * Reads a string from the stream considering the character width and prefixing.
282      *
283      * Params:
284      *   CHAR = The character type used for reading the string (char, wchar, or dchar).
285      *   PREFIXED = Indicates whether the string is prefixed. Default is false.
286      *
287      * Returns:
288      *  The read string from the stream.
289      */
290     immutable(CHAR)[] readString(CHAR, bool PREFIXED = false)()
291     {
292         static if (PREFIXED)
293             return cast(immutable(CHAR)[])read!(immutable(CHAR)[]);
294         else
295         {
296             immutable(CHAR)[] ret;
297             while (peek!CHAR != '\0')
298                 ret ~= read!CHAR;
299             return ret;
300         }
301     }
302 
303     /**
304      * Reads a string from the stream considering the character width and prefixing without advancing the stream position.
305      *
306      * Params:
307      *   CHAR = The character type used for reading the string (char, wchar, or dchar).
308      *   PREFIXED = Indicates whether the string is prefixed. Default is false.
309      *
310      * Returns:
311      *  The read string from the stream.
312      */
313     immutable(CHAR)[] peekString(CHAR, bool PREFIXED = false)()
314     {
315         auto _position = position;
316         scope (exit) position = _position;
317         static if (PREFIXED)
318             return cast(immutable(CHAR)[])read!(immutable(CHAR)[]);
319         else
320         {
321             immutable(CHAR)[] ret;
322             while (peek!CHAR != '\0')
323                 ret ~= read!CHAR;
324             return ret;
325         }
326     }
327 
328     /**
329      * Writes a string to the stream considering the character width and prefixing.
330      *
331      * Params:
332      *   CHAR = The character type used for writing the string (char, wchar, or dchar).
333      *   PREFIXED = Indicates whether the string is prefixed. Default is false.
334      *   val = The string to be written to the stream.
335      */
336     void writeString(CHAR, bool PREFIXED = false)(immutable(CHAR)[] val)
337     {
338         static if (!PREFIXED)
339             val ~= '\0';
340 
341         write!(immutable(CHAR)[], PREFIXED)(val);
342     }
343 
344     /**
345      * Writes a string into the stream considering the character width and prefixing without advancing the stream position.
346      *
347      * Params:
348      *   CHAR = The character type used for writing the string (char, wchar, or dchar).
349      *   PREFIXED = Indicates whether the string is prefixed. Default is false.
350      *   val = The string to be put into the stream.
351      */
352     void putString(CHAR, bool PREFIXED = false)(immutable(CHAR)[] val)
353     {
354         static if (!PREFIXED)
355             val ~= '\0';
356 
357         put!(immutable(CHAR)[], PREFIXED)(val);
358     }
359 
360     /**
361      * Reads an integer value encoded in 7 bits from the stream.
362      *
363      * Returns:
364      *  The integer value read from the stream.
365      */
366     uint read7EncodedInt()
367     {
368         uint result = 0;
369         uint shift = 0;
370 
371         foreach (i; 0..5)
372         {
373             ubyte b = read!ubyte;
374             result |= cast(uint)(b & 0x7F) << shift;
375             if ((b & 0x80) == 0)
376                 return result;
377             shift += 7;
378         }
379         
380         return result;
381     }
382 
383     /**
384      * Writes an integer value encoded in 7 bits to the stream.
385      *
386      * Params:
387      *   val = The integer value to be written to the stream.
388      */
389     void write7EncodedInt(uint val)
390     {
391         foreach (i; 0..5)
392         {
393             byte b = cast(byte)(val & 0x7F);
394             val >>= 7;
395             if (val != 0)
396                 b |= 0x80;
397             write(b);
398             if (val == 0)
399                 return;
400         }
401     }
402 
403     /**
404      * Reads data from a byte stream into a structured type based on specified field names and read kinds.  
405      * Designed specifically for better control reading string and array fields.
406      *
407      * Params:
408      *   T = The type representing the structure to read into.
409      *   ARGS = Variadic template parameter representing field names and read kinds.
410      *
411      * Returns:
412      *  Returns an instance of type T with fields populated based on the specified read operations.
413      */
414     T read(T, ARGS...)()
415     {
416         T val;
417         foreach (field; Fields!T)
418         {
419             alias M = TypeOf!(val, field);
420             bool cread;
421             static foreach (i, ARG; ARGS)
422             {
423                 static if (i % 3 == 0)
424                 {
425                     static assert(is(typeof(ARG) == string),
426                         "Field name expected, found " ~ ARG.stringof);  
427                 }
428                 else static if (i % 3 == 1)
429                 {
430                     static assert(is(typeof(ARG) == ReadKind),
431                         "Read kind expected, found " ~ ARG.stringof);  
432                 }
433                 else
434                 {
435                     static if (field == ARGS[i - 2] || ARGS[i - 2] == "")
436                     {
437                         static if (!isStaticArray!M && is(M == string))
438                         {
439                             cread = true;
440                             static if (ARGS[i - 1] == ReadKind.Field)
441                             {
442                                 __traits(getMember, val, field) = read!char(__traits(getMember, val, ARG)).to!string;
443                             }
444                             else static if  (ARGS[i - 1] == ReadKind.Fixed)
445                             {
446                                 __traits(getMember, val, field) =  read!char(ARG).to!string;
447                             }
448                             else
449                             {
450                                 __traits(getMember, val, field) = readString!(char, ARG);
451                             }
452                         }
453                         else static if (!isStaticArray!M && is(M == wstring))
454                         {
455                             cread = true;
456                             static if (ARGS[i - 1] == ReadKind.Field)
457                             {
458                                 __traits(getMember, val, field) = read!wchar(__traits(getMember, val, ARG)).to!string;
459                             }
460                             else static if  (ARGS[i - 1] == ReadKind.Fixed)
461                             {
462                                 __traits(getMember, val, field) = read!wchar(ARG).to!string;
463                             }
464                             else
465                             {
466                                 __traits(getMember, val, field) = readString!(wchar, ARG);
467                             }
468                         }
469                         static if (!isStaticArray!M && is(M == dstring))
470                         {
471                             cread = true;
472                             static if (ARGS[i - 1] == ReadKind.Field)
473                             {
474                                 __traits(getMember, val, field) = read!dchar(__traits(getMember, val, ARG)).to!string;
475                             }
476                             else static if  (ARGS[i - 1] == ReadKind.Fixed)
477                             {
478                                 __traits(getMember, val, field) = read!dchar(ARG).to!string;
479                             }
480                             else
481                             {
482                                 __traits(getMember, val, field) = readString!(dchar, ARG);
483                             }
484                         }
485                         else static if (isDynamicArray!M)
486                         {
487                             cread = true;
488                             static if (ARGS[i - 1] == ReadKind.Field)
489                             {
490                                 __traits(getMember, val, field) = read!(ElementType!M)(__traits(getMember, val, ARG));
491                             }
492                             else static if  (ARGS[i - 1] == ReadKind.Fixed)
493                             {
494                                 __traits(getMember, val, field) = read!(ElementType!M)(ARG);
495                             }
496                             else
497                             {
498                                 __traits(getMember, val, field) = read!M;
499                             }
500                         }
501                     }
502                 }
503             }
504             if (!cread)
505                 __traits(getMember, val, field) = read!M;
506         }
507         return val;
508     }
509 
510     /// ditto
511     T[] read(T, ARGS...)(size_t count)
512     {
513         T[] items;
514         foreach (i; 0..count)
515             items ~= read!(T, ARGS);
516         return items;
517     }
518 
519     /**
520      * Reads a type from the stream using optional fields.
521      *
522      * Params:
523      *   T = The type to be read from the stream.
524      *   ARGS... = The arguments for optional fields.
525      *
526      * Returns:
527      *  The read type read from the stream.
528      */
529     T readPlasticized(T, ARGS...)()
530         if (ARGS.length % 3 == 0)
531     {
532         T val;
533         foreach (field; Fields!T)
534         {
535             bool cread = true;
536             static foreach (i, ARG; ARGS)
537             {
538                 static if (i % 3 == 0)
539                 {
540                     static assert(is(typeof(ARG) == string),
541                         "Field name expected, found " ~ ARG.stringof);  
542                 }
543                 else static if (i % 3 == 1)
544                 {
545                     static assert(is(typeof(ARG) == string),
546                         "Conditional field name expected, found " ~ ARG.stringof);
547                 }
548                 else
549                 {
550                     if (field == ARGS[i - 2] && __traits(getMember, val, ARGS[i - 1]) != ARG)
551                         cread = false;
552                 }
553             }
554             if (cread)
555                 __traits(getMember, val, field) = read!(TypeOf!(val, field));
556         }
557         return val;
558     }
559 }