arcs

Amateur Radio Chat Server with a modern interface
git clone git://squid-tech.com/arcs.git
Log | Files | Refs | README

commit ccecf79697e4c8ebd5a23e50fa1e4036ba398c05
Author: joshiemoore <jxm5210@g.rit.edu>
Date:   Wed, 18 Dec 2019 22:45:44 -0500

Initial commit - able to send, receive, and decode AX.25 packets

Diffstat:
Aarcs.py | 181+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 181 insertions(+), 0 deletions(-)

diff --git a/arcs.py b/arcs.py @@ -0,0 +1,181 @@ +""" +Amateur Radio Chat Server +""" +import sys +import serial + +# replace with your callsign +MYCALL = 'KD2SIX' + +### server config + +# server callsign +HOST = 'KD2SIX-1' + +# server symbol +SYMBOL = '/B' + +# server position (change this to your position!) +SERV_LAT = '4306.50N' +SERV_LONG = '07608.61W' + +# message for position beacon +BEACON_MSG = 'Amateur Radio Chat Server - WIP' + + +KISS_FEND = 0xC0 +KISS_FESC = 0xDB +KISS_TFEND = 0xDC +KISS_TFESC = 0xDD + + +""" +Send command and data to the KISS TNC. +data should be a byte array. +""" +def KISScmd(port, command, data): + out = [] + out.append(0xC0) + + out.append(command) + for d in data: + out.append(d) + + out.append(0xC0) + + port.write(bytearray(out)) + +""" +Initialize the KISS TNC. +""" +def KISSinit(port): + KISScmd(port, 0x01, [40]) # TX_DELAY + KISScmd(port, 0x02, [128]) # P + KISScmd(port, 0x03, [3]) # SlotTime + KISScmd(port, 0x04, [20]) # TXtail + KISScmd(port, 0x05, [0]) # FullDuplex + +""" +AX.25-encode a callsign. +""" +def encodecall(call, final): + if "-" not in call: + call = call + "-0" + cs, ssid = call.split('-') + if len(cs) < 6: + cs = cs + " "*(6 - len(cs)) + encoded_call = [ord(x) << 1 for x in cs[0:6]] + encoded_ssid = (int(ssid) << 1) | 0b01100000 | (0b00000001 if final else 0) + return encoded_call + [encoded_ssid] + +""" +AX.25-decode a callsign. +""" +def decodecall(call): + decodedcall = "" + + # decode callsign + for i in range(0, len(call) - 1): + c = int.from_bytes(call[i], "little") >> 1 + if c != ' ': + decodedcall += chr(c) + + # decode SSID + ssid = (int.from_bytes(call[-1], "little") >> 1) & 0x0F + if ssid != 0: + decodedcall += "-" + str(ssid) + (" " if ssid < 10 else "") + else: + decodedcall += " " + + return decodedcall + +""" +Send an AX.25 packet from source to dest. +""" +def AX25send(port, source, dest, path, msg): + dest_addr = encodecall(dest.upper(), False) + src_addr = encodecall(source.upper(), (path is None) or (len(path) == 0)) + c_byte = [0x03] # UI frame + pid = [0xF0] # no protocol + msg = [ord(c) for c in msg] + packet = dest_addr + src_addr + + # add path + for i in range(len(path)): + packet += encodecall(path[i].upper(), i == len(path) - 1) + + packet += c_byte + pid + msg + + # escape the packet + packet_escaped = [] + for x in packet: + if x == KISS_FEND: + packet_escaped += [KISS_FESC, KISS_TFEND] + elif x == KISS_FESC: + packet_escaped += [KISS_FESC, KISS_TFESC] + else: + packet_escaped += [x] + + KISScmd(port, 0x00, bytearray(packet_escaped)) + +""" +Receive a packet from the KISS TNC as a byte array. +""" +def AX25read(port): + inpacket = [] + write = False + + while 1: + inb = port.read() + + if write: + if inb == b'\xC0': + return inpacket + + inpacket.append(inb) + else: + if inb == b'\xC0': + write = True + +""" +Parse a received packet, returning (dest, source, msg). +""" +def AX25parse(packet): + dest = decodecall(packet[1:8]) + source = decodecall(packet[8:15]) + + msg = "" + + # find the message + for x in range(len(packet)): + if packet[x] == b'\x03' and packet[x + 1] == b'\xF0': + for c in packet[x + 2:]: + msg += chr(int.from_bytes(c, "little")) + break + + return (dest, source, msg) + +""" +Transmit the server's beacon message, position, and symbol. +""" +def beacon(port): + out = "=" + SERV_LAT + SYMBOL[0] + SERV_LONG + SYMBOL[1] + BEACON_MSG + AX25send(port, HOST, "APRS", ["WIDE2-2"], out) + + +def main(): + # initialize serial port + ser = serial.Serial( + port=sys.argv[1], + baudrate=sys.argv[2] + ) + + # initialize the KISS TNC + KISSinit(ser) + + #beacon(ser) + while True: + print(AX25parse(AX25read(ser))) + +if __name__ == '__main__': + main()