Tengo una aplicación que tiene que comunicar una serie de datos por UDP multicast (entre otros modos, pero mi problema es en multicast). Cuando dos nodos intercambian información no hay problema. Pero cuando conecto físicamente el cable del tercero (ni siquiera la aplicación está corriendo), se empiezan a recibir, frenéticamente, en todos los nodos de la red los mismos mensajes una y otra vez y a una velocidad tal que dejan las máquinas totalmente saturadas. Ni os cuento cuando arranco la aplicación en el tercero...
Es como si los mensajes no parasen de reenviarse de un nodo a otro. Los tres nodos están conectados a un mismo switch (o router, no sé lo que es ¿es importante?)
Es la primera aplicación con sockets que hago así que no me extrañaría que los manejara mal... Quizás el problema tendrá que ver con la forma que tengo de enchufar los mensajes en la red... no lo sé... la verdad es que ando muy perdido, no entiendo cómo se puede producir esta situación.
Pego aquí el código que utilizo para enviar y recibir mensajes:
Enviar mensajes:
Código:
Hilo de escucha:private synchronized boolean sendMessage(XXXMessageDocument message, short dc, short ds, short dss) throws IOException, NodeNotFoundException { // Obtener IP y puerto del destinatario String ip = nodeList.getUdpIpNode(dc, ds, dss); int port = nodeList.getUdpPortNode(dc, ds, dss); InetAddress address = InetAddress.getByName(ip); try { if (aSocket == null || aSocket.isClosed()) { if (SocketTypeEnum.UNICAST.equals(socketType)) { aSocket = new DatagramSocket(); aSocket.setBroadcast(false); log.debug("Nuevo socket UDP unicast abierto"); } else if (SocketTypeEnum.BROADCAST.equals(socketType)) { aSocket = new DatagramSocket(); aSocket.setBroadcast(true); log.debug("Nuevo socket UDP broadcast abierto"); } else if (SocketTypeEnum.MULTICAST.equals(socketType)) { aSocket = new MulticastSocket(port); ((MulticastSocket)aSocket).joinGroup(address); ((MulticastSocket)aSocket).setTimeToLive(multicastTtl); log.debug("Nuevo socket UDP multicast abierto"); } } XXXDatagram datagram = new XXXDatagram(sc, ss, sss, packetSize, firstSegmentNumber, msgTtl); datagram.setCompression(encoding); datagram.setPriority(priority); byte[][] datagramBytes = datagram.encodeDatagram(message, dc, ds, dss); for (byte[] segment : datagramBytes) { flowControl.allowSend(); log.debug("Enviando paquete por " + ip +":"+ port); DatagramPacket d = new DatagramPacket(segment, segment.length, address, port); aSocket.send(d); } log.info("Enviado mensaje a " + dc +":"+ ds +":"+ dss); retries = 0; } catch (ConnectException e) { retries++; if (retries == 1) log.warn("No se puede conectar con "+ip+":"+port+", encolamos mensaje"); else log.debug("No se puede conectar con "+ip+":"+port+", encolamos mensaje"); Sending m = new Sending(message, dc, ds, dss); tail.push(m); } return true; }
Código:
Gracias por vuestra atención.public void run() { log.info("Conectando servidor " + ip + ":" + port + "..."); try { connected = true; // Inicializar almacen de datagramas timeoutControl = new TtlDatagramControlThread(packetSize, firstSegmentNumber, msgTtl); timeoutControl.start(); while (connected) { try { if (aSocket == null || aSocket.isClosed()) { if (SocketTypeEnum.UNICAST.equals(socketType)) { aSocket = new DatagramSocket(port); aSocket.setBroadcast(false); } else if (SocketTypeEnum.BROADCAST.equals(socketType)) { aSocket = new DatagramSocket(port); aSocket.setBroadcast(true); } else if (SocketTypeEnum.MULTICAST.equals(socketType)) { aSocket = new MulticastSocket(port); InetAddress multicast = InetAddress.getByName(ip); ((MulticastSocket)aSocket).joinGroup(multicast); } log.info("Servidor levantado."); } byte[] buffer = new byte[TAM_BUFFER]; DatagramPacket p = new DatagramPacket(buffer, buffer.length); aSocket.receive(p); // Nos bloqueamos hasta recibir un paquete MessageProcessingThread thread = new MessageProcessingThread(p.getData(), p.getLength()); thread.start(); } catch (Exception e) { log.error("Error inesperado", e); // Esperamos antes de volver abrir el socket try { sleep(5000); } catch (InterruptedException e1) { log.error("Error inesperado", e); } } } } finally { if (aSocket != null) aSocket.close(); onServerDown(); } }
Saludos!!