1: #!/usr/bin/perl
2: =foo
3: Update: added rules
4: Update: on my darwin i set up the relevant rule with
5:
6: ipfw add 65500 divert 65500 tcp from me to any via en0
7:
8: replace with your iface of choice, naturally. I haven't set this up on
9: linux (yet), but I know divert sockets were ported at some point...
10: ciao ciao!
11:
12: This little script was hell to write, because every time i didn't do the
13: firewall rule right, and had a fix for the script, my netinfo server
14: couldn't be contacted... this ment that i couldn't run
15:
16: ipfw flush
17:
18: regardless, i've finally made it. It now prioritises using a size based
19: queue, which is really nothing compared to what you can do. Suprisingly
20: it's not a resource hog, and there's still some stuff to implement, such
21: as timeout based dropping using a journal (the purger is written, but
22: not the logger), and priority queue clipping using mapl's janus heaps...
23:
24: I guess i'll prioritize the cmp sub so that it puts tcp SYN packets, and
25: non piggybacked ACKS, aswell as short ICMP and UDP at the top
26: automatically. Then I'll prioritise by service...
27:
28: so far only the heap has made bittorrent and perlmonks chat possible at
29: the same time, even when running several bts at a time... I must say I
30: was surprised.
31:
32: Replace "#print" with ";print" for debugging info.
33:
34: Unresolved bug: within the SIGIO handler recv will sometimes return
35: undefined, but the loop check doesn't pick it up, so pack complains. I
36: have no idea why.
37:
38: Bah. Enough yak, here's the stuff:
39: =cut
40:
41: use strict;
42: use warnings;
43:
44: use Socket;
45: use Time::HiRes qw(time sleep alarm);
46: use Fcntl qw(F_SETOWN F_GETFL F_SETFL O_ASYNC O_NONBLOCK);
47: use Array::Heap;
48: #use Array::Heap2; # same thing, with a different namespacs
49: use NetPacket::IP qw(IP_PROTO_ICMP IP_PROTO_TCP IP_PROTO_UDP);
50: use NetPacket::TCP;
51: use NetPacket::UDP;
52: use NetPacket::ICMP;
53: use Scalar::Util qw(dualvar);
54:
55: ### KNOWN BUGS
56: # this code is very dirty and hackish, may be resolved in the future
57:
58: ### CHANGES
59: # added real rules to order the heaps. A simple API and examples at the bottom...
60: # currently aggressively user oriented, with a bit of space for standard unixish servers
61:
62: ### TODO
63: # score changes dynamically based on time... Instability in heap could
64: # be negligeable at this point, as TCP is fault tolerant and UDP
65: # shouldn't care. ICMP should a have high enough priority
66: #
67: # purge using timeout, not just by clipping the queue
68: #
69: # both of these are theoretically solveable using references within the queue,
70: # so that you could undef and change the value of the referants (thingys) in
71: # the heap itself. I wonder if make_heap instead of push_heap would be much worse.
72: # This way we could take a reference to an element in the heap.
73:
74: # aggressive purging minimizes wasted bandwidth:
75: # have an x second maximum queue life (to be implemented)
76: # purge heap size:
77: # if (overflow){
78: # reverse order
79: # reverse heapify
80: # reverse pop
81: # reverse order
82: # }
83: #
84: # push
85:
86:
87: # user config
88: sub PORT () { 65500 }; # the port to bind on
89: sub WANTED_BYTES_PER_SECOND () { 94 * 128 }; # cap ( * 128 for kilobits, * 1024 for kilobytes )... divide by ERROR
90: sub SLEEP_INTERVAL () { 0.05 } ## smaller = smoother, more overhead. For some perspective:
91: ## 0.01 is usually the size of a time slice on unices... (linux lets you get picky)
92: ## 0.03 gives good response on ~10K, 0.05 on 100K and more. the question is really
93: ## how many packets do you burst before delaying for them all. If you send 100 packets
94: ## a second the overhead of calling sleep will cause a significant delay
95: sub PURGE_INTERVAL () { 2 } ## how often are packets purged
96: sub PURGE_TIMEOUT () { 20 } ## how long can a packet live in the queue... only as accurate as PURGE_INTERVAL
97: sub QUEUE_LIMIT () { 32 } ## how many packets are allowed to be in the queue at any given time
98:
99: # constants
100: sub ERROR () { 0.996 }; # clipped average on my machine, for 12 * 1024, 64 * 1024, 128 * 1024
101: sub BYTES_PER_SECOND () { WANTED_BYTES_PER_SECOND / ERROR };
102: sub SLICE () { BYTES_PER_SECOND * SLEEP_INTERVAL }; # a slice of data corresponding to SLEEP_INTERVAL
103: sub NOQUEUE () { O_ASYNC | O_NONBLOCK };
104: sub PACK_QUEUE () { "a16 a*" };
105: sub PACK_DEPT () { "d S" }; ## F: less overhead, innacurate. d: accurate, more overhead,
106: ## D: less overhead, accurate, needs long double support (perl -V).
107: sub PURGE_MARKERS () { int(PURGE_TIMEOUT / PURGE_INTERVAL) }
108:
109: # variables
110: my ($dept,@dept) = (0);
111: my ($qcnt,@qjournal,@queue) = (0,((undef) x PURGE_MARKERS));
112:
113:
114: #print "Initializing...\n";
115: #print "SLICE=", SLICE, "\n";
116:
117: my $rules = new Rules;
118: install_rules($rules);
119:
120:
121: ### hackery for priority queue implmentation
122:
123: sub queue (@) { ## use $rules to compute score, push, janusify and pop if needed
124: my @packets = @_;
125:
126: foreach my $packet (@packets){
127: $qcnt++;
128: $packet = dualvar $rules->evaluate(substr($packet,16)), $packet;
129: # add journal entry
130: }
131:
132: if ($qcnt > QUEUE_LIMIT){
133: #print "queue has exceeded limit, clipping\n";
134: @queue = reverse @queue;
135: make_heap_cmp { $b <=> $a } @queue;
136: while ($qcnt > QUEUE_LIMIT){
137: pop_heap_cmp { $b <=> $a } @queue;
138: $qcnt--;
139: }
140: @queue = reverse @queue;
141: }
142:
143: push_heap @queue, @packets;
144: }
145:
146: sub dequeue (){ ### pop while defined
147: $qcnt--;
148: pop_heap @queue;
149: }
150:
151: ### hackery ends
152:
153:
154: ## set up socket
155: socket D, PF_INET, SOCK_RAW, getprotobyname("divert") or die "$!";
156: bind D,sockaddr_in(PORT,inet_aton("0.0.0.0")) or die "$!";
157: fcntl D, F_SETOWN, $$;
158: #print "fcntl returned async ", ((fcntl D, F_GETFL, 0 & NOQUEUE) ? "on" : "off"), "\n";
159:
160: #$SIG{ALRM} = sub {
161: # if ($qcnt){
162: # while (defined $qjournal[0]){
163: # undef ${shift @qjournal}; # undefine the reference
164: # if (--$qcnt == 1){
165: # @queue = ();
166: # @qjournal = ((undef) x PURGE_MARKERS);
167: # return;
168: # }
169: # }; shift @qjournal; # take out one marker
170: # push @qjournal, undef; # put another in
171: # }
172: #};
173: #alarm PURGE_INTERVAL,PURGE_INTERVAL; # purge old packets every n
174:
175: my ($ts,$p); # lexical for SIGIO temp usage... don't alloc/daelloc every time
176: $SIG{IO} = sub {
177: #print time(), " SIGIO: queuing up\n";
178: while (defined($ts = recv D, $p, 65535, 0)){ # we assume packets can come in faster than SIGIO can be called
179: # (on my machine, 128 * 1024 cap, this loops usually picks up 3-4
180: # packets), so we'll save some context switching on high load,
181: #print "undefined p ($!)\n" unless defined $p;
182: #print "undefined ts ($!)\n" unless defined $ts;
183: queue(pack(PACK_QUEUE, $ts, $p));
184: }
185: };
186:
187: #print "Initialization complete, starting read loop\n";
188:
189: # start loop
190: my ($to, $t, $s, $l);
191: #my ($start, $end, $total); # used to compute ERROR
192: PACKET: { if (defined ($to = recv D, $_, 65535, 0)){ # blocking read. the queue is empty. $to is reassigned
193: # because the packet could come from various rules. hack at it
194: # if it ticks you off.
195: #print time(), " received packet\n";
196: #print "received: " . length($to) . "\n";
197: if ($dept < SLICE){
198: #print time(), " dept is $dept - short circuited, should take ", length($_) / BYTES_PER_SECOND, " seconds to deliver\n";
199: send D, $_, 0, $to;
200: $dept += length($_);
201: push @dept, pack(PACK_DEPT, time(), length($_) );
202: redo PACKET;
203: } else {
204: #print time(), " queued (too much dept: $dept)\n";
205: queue(pack(PACK_QUEUE, $to, $_)); # pack is about 1.5 times faster than refs (benchmark)
206: }
207:
208: # the queue is not empty, or dept needs purging
209:
210: #print time(), " clearing up queue\n";
211:
212: fcntl D, F_SETFL, ((fcntl D, F_GETFL, 0)|NOQUEUE); # switch to async
213: #print "fcntl is now noqueue ", ((fcntl D, F_GETFL, 0 & NOQUEUE) ? "on" : "off"), "\n";
214:
215: # use to compute ERROR
216: #$start = time;
217: #$total = 0;
218:
219: until (not $qcnt){ # until the queue is empty
220: do {
221: #print time(), " cleaning out and making up for dept\n";
222: $t = time;
223: for (my $i = 0; $i < @dept; $i++){
224: defined $dept[$i] or next;
225: ($s, $l) = unpack(PACK_DEPT, $dept[$i]);
226: #print time(), " diff is ", time - $s, ", ", ($l / BYTES_PER_SECOND)," diff needed queue length is $#queue, dept joural has $#dept entries ($dept)\n";
227: if ($t > $s + ($l / BYTES_PER_SECOND) ){
228: $dept -= $l;
229: delete($dept[$i]); # faster than splice
230: }
231: }
232: while (@dept and not exists $dept[0]){ shift @dept }; ## clean out those which are easy
233: #print time(), " dept is now $dept\n";
234: #print time(), " will sleep for ", $dept / BYTES_PER_SECOND,"\n" if $dept > SLICE;
235: } while (($dept > SLICE) and sleep $dept / BYTES_PER_SECOND); # sleep (one should suffice, but in case a sig came
236: # (IO, ALRM are used)) until we've cleared the dept
237: #print time(), " dept is now $dept, flushing a packet\n";
238:
239: my ($to,$p) = unpack(PACK_QUEUE, dequeue() );
240: $dept += length($p);
241: push @dept, pack(PACK_DEPT, time(), length($p) );
242: #$total += length($p); used to compute ERROR
243: #print time(), " sent one from queue, dept is now $dept, should take ", length($p) / BYTES_PER_SECOND, " seconds to deliver (queue left: $#queue)\n";
244: send D, $p, 0, $to;
245:
246: !$qcnt ? fcntl D, F_SETFL, ((fcntl D, F_GETFL, 0)&!NOQUEUE) : redo ; # unset async. checking here will skip checking
247: # until(!queue), up to the time fcntl is called.
248: # Then a double check is made to avoid a packet
249: # getting stuck in the queue while others are
250: # getting short circuited
251: #print "fcntl is now noqueue ", ((fcntl D, F_GETFL, 0 & NOQUEUE) ? "on" : "off"), "\n";
252: }
253:
254: # use this code to determine ERROR
255: #$end = time;
256: #my $bps = ($total/($end-$start));
257: # print "during high load we sent $total bytes in ", $end-$start, " seconds, which means ", $bps, " bytes per second.\n";
258: # print "the ratio of actual rate versus cap is ", $bps/BYTES_PER_SECOND, "\n";
259:
260: #print time(), " queue empty, returned to synchronious IO\n";
261:
262: # the queue is empty
263: } redo }
264:
265:
266: 1; # Keep your mother happy.
267:
268: sub install_rules { ## the rules
269: $_[0]->install(
270: ### DEPENDANCIES
271: Rule::Dependancy::Simple->new({ # basic (network unrelated) data
272: provides => ["basic"],
273: evaluate => sub {
274: my $packet = shift;
275: my $basic = new Dependancy::Simple;
276:
277: $basic->set("size",length($packet));
278:
279: {basic => $basic};
280: },
281: }),
282:
283: Rule::Dependancy::Simple->new({ # ip packet data
284: provides => ["ip"],
285: evaluate => sub { {ip => NetPacket::IP->decode($_[0]) } }
286: }),
287:
288: Rule::Dependancy::Simple->new({ # tcp packet data
289: needs => ["ip"],
290: provides => ["tcp"],
291: evaluate => sub {
292: #print "providing tcp packet dependancy\n";
293: ##print "got packet: ", unpack("H*",$_[0]), "\n";
294: ##print "Available dependancies:\n\n", do { use Data::Dumper; Dumper $_[1] },"\n";
295:
296: ($_[1]{ip}{proto} == IP_PROTO_TCP) ? {tcp => NetPacket::TCP->decode($_[1]{ip}{data}) } : {} }
297: }),
298:
299: Rule::Dependancy::Simple->new({ # udp packet data
300: needs => ["ip"],
301: provides => ["udp"],
302: evaluate => sub { ($_[1]{ip}{proto} == IP_PROTO_UDP) ? {udp => NetPacket::UDP->decode($_[1]{ip}{data}) } : {} }
303: }),
304:
305: Rule::Dependancy::Simple->new({ # icmp packet data
306: needs => ["ip"],
307: provides => ["icmp"],
308: evaluate => sub { ($_[1]{ip}{proto} == IP_PROTO_ICMP) ? {icmp => NetPacket::ICMP->decode($_[1]{ip}{data}) } : {} }
309: }),
310:
311: ### RULES
312: Rule::Simple->new({ # handle Type of Service et cetera (delay += 8, thoroughput += 5, reliability += 4, cost += 1, congestion += 2)
313: needs => ["ip"],
314: evaluate => sub { 0 },
315: }),
316:
317: Rule::Simple->new({ # packet size
318: needs => ["basic"],
319: evaluate => sub {
320: #print "evaluating size based score adjustment\n";
321: length($_[1]{basic}->get("size")) ? (1.5 * log(length($_[1]{basic}->get("size")))) : 0 }
322: }),
323:
324: Rule::Simple->new({ # tcp window size
325: needs => ["tcp"],
326: evaluate => sub {
327: #print "evaluating window size score adjustment\n";
328: $_[1]{tcp}{winsize} ? 0.1 * log($_[1]{tcp}{winsize}) : 0 }
329: }),
330:
331: Rule::Simple->new({ # icmp conditional
332: needs => ["icmp"],
333: evaluate => sub {
334: #print "packet is icmp, -20\n";
335: -20 },
336: }),
337:
338:
339: Rule::Simple->new({ # udp conditional
340: needs => ["udp"],
341: evaluate => sub {
342: #print "packet is UDP, -6\n";
343: -6 },
344: }),
345:
346: Rule::Simple->new({ # tcp flags
347: needs => ["tcp"],
348: evaluate => sub {
349: #print "evaluating tcp flags\n";
350: my $flags = $_[1]{tcp}{flags};
351:
352: my $ret = 0;
353:
354: # tcp messages with special information have varying degrees of additional importance
355: $ret -= 1 if $flags & FIN;
356: $ret -= 8 if $flags & SYN;
357: $ret -= 20 if $flags & RST; # attempt to help prevent waste by pushing as fast as possible. They're pretty rare anyway
358: $ret -= 5 if $flags & PSH;
359: $ret -= 2 if $flags & ACK; # packets without acks aren't as urgent
360: $ret -= 20 if $flags & URG;
361: # $ret += 0 if $flags & ECE;
362: # $ret += 0 if $flags & CWR;
363: #print "final score: $ret\n";
364: $ret;
365: }
366: }),
367:
368: Rule::Simple->new({ # generic (udp, tcp) port handling
369: wants => ["tcp","udp"], # we either have tcp, or tcp
370: evaluate => sub {
371: #print "evaluating port rules\n";
372: my $prot = (exists $_[1]->{tcp}) ? $_[1]{tcp} : $_[1]{udp};
373:
374: my $ret = 0;
375:
376: my $src = $prot->{src_port};
377: my $dst = $prot->{dest_port};
378:
379: #print "ports: dest=$dst, src=$src\n";
380:
381: SWITCH: { # source port
382: # unpriviliged ports
383: $src > 1024 and do {
384: $ret += 2;
385:
386: #print "source port is unpriviliged\n";
387:
388: $ret += 18, last if ($src >= 6881 and $src <= 6888); # bittorrent
389: $ret += 17, last if $src == 5501; # hotline
390: $ret += 15, last if $src == 20; # ftp data
391:
392: last;
393: };
394:
395: # important services
396: $src == 80 and $ret -= 1, last; # http
397: $src == 443 and $ret -= 1, last; # https
398: $src == 143 and $ret -= 4, last; # imap
399: $src == 110 and $ret -= 4, last; # pop3
400: $src == 25 and $ret -= 5, last; # smtp
401: $src == 22 and $ret -= 7, last; # ssh
402: $src == 21 and $ret -= 6, last; # ftp control
403: }
404:
405: SWITCH: { # destination port
406: $dst > 1024 and do {
407: $ret += 3;
408:
409: #print "destination port is unpriviliged\n";
410:
411: $ret += 16, last if ($dst >= 6881 and $dst <= 6888) and not ($src >= 6881 and $src <= 6888);
412: $ret += 15, last if $dst == 5501;
413: $ret += 14, last if $dst == 20;
414:
415: last;
416: };
417:
418: $dst == 80 and $ret -= 6, last; # http
419: $dst == 443 and $ret -= 6, last; # https
420: $dst == 143 and $ret -= 4, last; # imap
421: $dst == 110 and $ret -= 4, last; # pop3
422: $dst == 25 and $ret -= 2, last; # smtp
423: $dst == 22 and $ret -= 10, last; # ssh
424: $dst == 23 and $ret -= 10, last; # telnet
425: $dst == 21 and $ret -= 6, last; # ftp ctrl
426: }
427:
428: #print "port score: $ret\n";
429:
430: $ret;
431: }
432: }),
433: )
434: }
435:
436: package Rules; # API for joint abstraction - rules depend on common shared data, and may be added and removed.
437:
438: # rules evaluate recursive, possibly asynchroniously in the future.
439: # once a dependancy is solved it may not be altered, and all it's children may be computed on it with no locking - methods are supposed to return static or unrelated data.
440: # a dependancy gets it's own
441:
442:
443: # dependancy is either or: (more complexity may be built by creating empty dependnancy rules)
444: # needs -> a strong dependancy list. every dependancy must be met (evaluated as soon as all are met)
445: # wants -> a weak dependnancy list, at least one must be met (evaluated as soon as one is met)
446:
447: # evaluate -> run the rule, and return either a hash of dependancy objects, or a score modification
448:
449: # provides -> currently irrelevant. for hinting install in the future
450:
451: sub new {
452: bless [],shift; # dependancy tree, inverse dependancy tree, rules pending parent, execution tree
453: }
454:
455: sub install { # clear rules that will never have all their dependancies met, and then filter for duplicates
456: my $self = shift;
457:
458: # filter here
459:
460: #print "installing score rules\n";
461:
462: push @$self,@_;
463: }
464:
465: sub evaluate { # evaluate all of the rules and return the sum
466: my $self = shift;
467: my $packet = shift;
468: #no warnings; # perl doesn't like me playing with closures
469:
470: my %offers;
471: my %deferred;
472: my @ruleq;
473:
474: my $score = 0;
475:
476: #print "evaluating entire ruleset\n";
477:
478: foreach my $rule (@$self){
479: my $dep = [ 0, $rule ];
480:
481: # build dependancy counter
482: if ($rule->has_deps){
483: my @needs;
484: if ($rule->strong_deps){
485: @needs = grep { not exists $offers{$_} } $rule->needs;
486: $dep->[0] = scalar @needs;
487: } else {
488: $dep->[0] = 1;
489: @needs = grep { $dep->[0] and (exists $offers{$_} ? (($dep->[0] = 0),undef) : 1) } $rule->needs;
490: $dep->[0] or @needs = ();
491: }
492: #print "this rule needs (@needs)\n";
493: foreach my $dependancy (@needs){
494: $deferred{$dependancy} = $dep;
495: }
496: }
497:
498: push @ruleq,try($packet,\$score,\%offers,\%deferred,$dep);
499: }
500:
501: my $last = scalar @ruleq;
502: while(@ruleq){ # finish the loop
503: #print "attempting to evaluate remaining rules\n";
504: push @ruleq, try($packet,\$score,\%offers,\%deferred,shift @ruleq);
505:
506: (last == @ruleq) ? last : ($last = @ruleq); # break an infinite loop
507: }
508:
509: #print "Final score is $score\n";
510: return $score;
511:
512: sub try {
513: my ($packet,$score,$offers,$deferred,$dep) = (@_);
514: #print "trying to evaluate rule\n";
515: if ($dep->[0] < 1){
516: #print "all dependancies met\n";
517: my $ret = $dep->[1]->evaluate($packet,$offers);
518: if (ref $ret){
519: #print "rule introduced new offerings:";
520: foreach my $key (keys %{$ret}){
521: #print " $key,";
522: $offers->{$key} = $ret->{$key}; # install dependancies
523:
524: foreach my $dependant (@{$deferred->{$key}}){
525: $dependant->[0]--; # dependancy count goes down by one
526: }
527: }
528: #print "\n";
529: } else {
530: #print "rule adjusted score by $ret\n";
531: $$score += $ret;
532: } # don't forget this is a closure
533:
534: return (); # we have nothing to requeue
535: } else {
536: #print "unmet dependancies\n";
537: return $dep; # requeue the current one
538: }
539: }
540: }
541:
542: ## base packages for rules
543:
544: package Rule::Simple; # a rule is something that fits in rules, and works via a certain API. a leaf in a dependancy tree
545:
546: sub new {
547: my $pkg = shift;
548: bless shift, $pkg;
549: }
550: sub has_deps { (exists $_[0]{needs} or exists $_[0]{wants}) ? 1 : undef };
551: sub strong_deps { (exists $_[0]{needs}) ? 1 : undef };
552: sub needs { (exists $_[0]{needs}) ? @{$_[0]{needs}} : @{$_[0]{wants}} }
553: sub evaluate { goto &{shift->{evaluate}} }
554:
555: package Rule::Dependancy::Simple; # a dependancy rule is something another dependancy rule or plain rule needs. a node in a dependancy tree.
556:
557: use base "Rule::Simple"; # a simple rule that also provides();
558: sub provides { @{$_[0]{provides}} }
559:
560: package Dependancy::Simple; # a dependancy is something a dependancy rule creates - This is just a base class for dependancy objects to work on. It contains plain values, and is basically a blessed hash
561:
562: sub new { bless {},shift }
563:
564: sub set { # set a value
565: $_[0]{$_[1]} = $_[2];
566: }
567:
568: sub get { # get a value
569: $_[0]{$_[1]}
570: }
571:
572: __END__