9
3

TCPConnection.cpp 43 KB


  1. /*
  2. EQ2Emulator: Everquest II Server Emulator
  3. Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net)
  4. This file is part of EQ2Emulator.
  5. EQ2Emulator is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU General Public License as published by
  7. the Free Software Foundation, either version 3 of the License, or
  8. (at your option) any later version.
  9. EQ2Emulator is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. GNU General Public License for more details.
  13. You should have received a copy of the GNU General Public License
  14. along with EQ2Emulator. If not, see <http://www.gnu.org/licenses/>.
  15. */
  16. #include "../common/debug.h"
  17. #include <iostream>
  18. using namespace std;
  19. #include <string.h>
  20. #include <stdio.h>
  21. #include <iomanip>
  22. using namespace std;
  23. #include "TCPConnection.h"
  24. #include "../common/servertalk.h"
  25. #include "../common/timer.h"
  26. #include "../common/packet_dump.h"
  27. #include "Log.h"
  28. #ifdef FREEBSD //Timothy Whitman - January 7, 2003
  29. #define MSG_NOSIGNAL 0
  30. #endif
  31. #ifdef WIN32
  32. InitWinsock winsock;
  33. #endif
  34. #define LOOP_GRANULARITY 3 //# of ms between checking our socket/queues
  35. #define SERVER_LOOP_GRANULARITY 3 //# of ms between checking our socket/queues
  36. #define TCPN_DEBUG 0
  37. #define TCPN_DEBUG_Console 0
  38. #define TCPN_DEBUG_Memory 0
  39. #define TCPN_LOG_PACKETS 0
  40. #define TCPN_LOG_RAW_DATA_OUT 0
  41. #define TCPN_LOG_RAW_DATA_IN 0
  42. TCPConnection::TCPNetPacket_Struct* TCPConnection::MakePacket(ServerPacket* pack, int32 iDestination) {
  43. sint32 size = sizeof(TCPNetPacket_Struct) + pack->size;
  44. if (pack->compressed) {
  45. size += 4;
  46. }
  47. if (iDestination) {
  48. size += 4;
  49. }
  50. TCPNetPacket_Struct* tnps = (TCPNetPacket_Struct*) new uchar[size];
  51. tnps->size = size;
  52. tnps->opcode = pack->opcode;
  53. *((int8*) &tnps->flags) = 0;
  54. uchar* buffer = tnps->buffer;
  55. if (pack->compressed) {
  56. tnps->flags.compressed = 1;
  57. *((sint32*) buffer) = pack->InflatedSize;
  58. buffer += 4;
  59. }
  60. if (iDestination) {
  61. tnps->flags.destination = 1;
  62. *((sint32*) buffer) = iDestination;
  63. buffer += 4;
  64. }
  65. memcpy(buffer, pack->pBuffer, pack->size);
  66. return tnps;
  67. }
  68. TCPConnection::TCPConnection(bool iOldFormat, TCPServer* iRelayServer, eTCPMode iMode) {
  69. id = 0;
  70. Server = iRelayServer;
  71. if (Server)
  72. RelayServer = true;
  73. else
  74. RelayServer = false;
  75. RelayLink = 0;
  76. RelayCount = 0;
  77. RemoteID = 0;
  78. pOldFormat = iOldFormat;
  79. ConnectionType = Outgoing;
  80. TCPMode = iMode;
  81. pState = TCPS_Ready;
  82. pFree = false;
  83. pEcho = false;
  84. sock = 0;
  85. rIP = 0;
  86. rPort = 0;
  87. keepalive_timer = new Timer(SERVER_TIMEOUT);
  88. timeout_timer = new Timer(SERVER_TIMEOUT * 2);
  89. recvbuf = 0;
  90. sendbuf = 0;
  91. pRunLoop = false;
  92. charAsyncConnect = 0;
  93. pAsyncConnect = false;
  94. connection_socket = 0;
  95. recvbuf_size = 0;
  96. recvbuf_used = 0;
  97. recvbuf_echo = 0;
  98. sendbuf_size = 0;
  99. sendbuf_used = 0;
  100. #if TCPN_DEBUG_Memory >= 7
  101. cout << "Constructor #1 on outgoing TCP# " << GetID() << endl;
  102. #endif
  103. }
  104. TCPConnection::TCPConnection(TCPServer* iServer, SOCKET in_socket, int32 irIP, int16 irPort, bool iOldFormat) {
  105. Server = iServer;
  106. RelayLink = 0;
  107. RelayServer = false;
  108. RelayCount = 0;
  109. RemoteID = 0;
  110. id = Server->GetNextID();
  111. ConnectionType = Incomming;
  112. pOldFormat = iOldFormat;
  113. TCPMode = modePacket;
  114. pState = TCPS_Connected;
  115. pFree = false;
  116. pEcho = false;
  117. sock = 0;
  118. connection_socket = in_socket;
  119. rIP = irIP;
  120. rPort = irPort;
  121. keepalive_timer = new Timer(SERVER_TIMEOUT);
  122. timeout_timer = new Timer(SERVER_TIMEOUT * 2);
  123. recvbuf = 0;
  124. sendbuf = 0;
  125. pRunLoop = false;
  126. charAsyncConnect = 0;
  127. pAsyncConnect = false;
  128. recvbuf_size = 0;
  129. recvbuf_used = 0;
  130. recvbuf_echo = 0;
  131. sendbuf_size = 0;
  132. sendbuf_used = 0;
  133. #if TCPN_DEBUG_Memory >= 7
  134. cout << "Constructor #2 on outgoing TCP# " << GetID() << endl;
  135. #endif
  136. }
  137. TCPConnection::TCPConnection(TCPServer* iServer, TCPConnection* iRelayLink, int32 iRemoteID, int32 irIP, int16 irPort) {
  138. Server = iServer;
  139. RelayLink = iRelayLink;
  140. RelayServer = true;
  141. id = Server->GetNextID();
  142. RelayCount = 0;
  143. RemoteID = iRemoteID;
  144. if (!RemoteID)
  145. ThrowError("Error: TCPConnection: RemoteID == 0 on RelayLink constructor");
  146. pOldFormat = false;
  147. ConnectionType = Incomming;
  148. TCPMode = modePacket;
  149. pState = TCPS_Connected;
  150. pFree = false;
  151. pEcho = false;
  152. sock = 0;
  153. connection_socket = 0;
  154. rIP = irIP;
  155. rPort = irPort;
  156. keepalive_timer = 0;
  157. timeout_timer = 0;
  158. recvbuf = 0;
  159. sendbuf = 0;
  160. pRunLoop = false;
  161. charAsyncConnect = 0;
  162. pAsyncConnect = false;
  163. recvbuf_size = 0;
  164. recvbuf_used = 0;
  165. recvbuf_echo = 0;
  166. sendbuf_size = 0;
  167. sendbuf_used = 0;
  168. #if TCPN_DEBUG_Memory >= 7
  169. cout << "Constructor #3 on outgoing TCP# " << GetID() << endl;
  170. #endif
  171. }
  172. TCPConnection::~TCPConnection() {
  173. Disconnect();
  174. ClearBuffers();
  175. if (ConnectionType == Outgoing) {
  176. MRunLoop.lock();
  177. pRunLoop = false;
  178. MRunLoop.unlock();
  179. MLoopRunning.lock();
  180. MLoopRunning.unlock();
  181. #if TCPN_DEBUG_Memory >= 6
  182. cout << "Deconstructor on outgoing TCP# " << GetID() << endl;
  183. #endif
  184. }
  185. #if TCPN_DEBUG_Memory >= 5
  186. else {
  187. cout << "Deconstructor on incomming TCP# " << GetID() << endl;
  188. }
  189. #endif
  190. safe_delete(keepalive_timer);
  191. safe_delete(timeout_timer);
  192. safe_delete_array(recvbuf);
  193. safe_delete_array(sendbuf);
  194. safe_delete_array(charAsyncConnect);
  195. }
  196. void TCPConnection::SetState(int8 in_state) {
  197. MState.lock();
  198. pState = in_state;
  199. MState.unlock();
  200. }
  201. int8 TCPConnection::GetState() {
  202. int8 ret;
  203. MState.lock();
  204. ret = pState;
  205. MState.unlock();
  206. return ret;
  207. }
  208. void TCPConnection::Free() {
  209. if (ConnectionType == Outgoing) {
  210. ThrowError("TCPConnection::Free() called on an Outgoing connection");
  211. }
  212. #if TCPN_DEBUG_Memory >= 5
  213. cout << "Free on TCP# " << GetID() << endl;
  214. #endif
  215. Disconnect();
  216. pFree = true;
  217. }
  218. bool TCPConnection::SendPacket(ServerPacket* pack, int32 iDestination) {
  219. LockMutex lock(&MState);
  220. if (!Connected())
  221. return false;
  222. eTCPMode tmp = GetMode();
  223. if (tmp != modePacket && tmp != modeTransition)
  224. return false;
  225. if (RemoteID)
  226. return RelayLink->SendPacket(pack, RemoteID);
  227. else {
  228. TCPNetPacket_Struct* tnps = MakePacket(pack, iDestination);
  229. if (tmp == modeTransition) {
  230. InModeQueuePush(tnps);
  231. }
  232. else {
  233. #if TCPN_LOG_PACKETS >= 1
  234. if (pack && pack->opcode != 0) {
  235. struct in_addr in;
  236. in.s_addr = GetrIP();
  237. CoutTimestamp(true);
  238. cout << ": Logging outgoing TCP packet. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl;
  239. #if TCPN_LOG_PACKETS == 2
  240. if (pack->size >= 32)
  241. DumpPacket(pack->pBuffer, 32);
  242. else
  243. DumpPacket(pack);
  244. #endif
  245. #if TCPN_LOG_PACKETS >= 3
  246. DumpPacket(pack);
  247. #endif
  248. }
  249. #endif
  250. ServerSendQueuePushEnd((uchar**) &tnps, tnps->size);
  251. }
  252. }
  253. return true;
  254. }
  255. bool TCPConnection::SendPacket(TCPNetPacket_Struct* tnps) {
  256. LockMutex lock(&MState);
  257. if (RemoteID)
  258. return false;
  259. if (!Connected())
  260. return false;
  261. eTCPMode tmp = GetMode();
  262. if (tmp == modeTransition) {
  263. TCPNetPacket_Struct* tnps2 = (TCPNetPacket_Struct*) new uchar[tnps->size];
  264. memcpy(tnps2, tnps, tnps->size);
  265. InModeQueuePush(tnps2);
  266. return true;
  267. }
  268. if (GetMode() != modePacket)
  269. return false;
  270. #if TCPN_LOG_PACKETS >= 1
  271. if (tnps && tnps->opcode != 0) {
  272. struct in_addr in;
  273. in.s_addr = GetrIP();
  274. CoutTimestamp(true);
  275. cout << ": Logging outgoing TCP NetPacket. OPCode: 0x" << hex << setw(4) << setfill('0') << tnps->opcode << dec << ", size: " << setw(5) << setfill(' ') << tnps->size << " " << inet_ntoa(in) << ":" << GetrPort();
  276. if (pOldFormat)
  277. cout << " (OldFormat)";
  278. cout << endl;
  279. #if TCPN_LOG_PACKETS == 2
  280. if (tnps->size >= 32)
  281. DumpPacket((uchar*) tnps, 32);
  282. else
  283. DumpPacket((uchar*) tnps, tnps->size);
  284. #endif
  285. #if TCPN_LOG_PACKETS >= 3
  286. DumpPacket((uchar*) tnps, tnps->size);
  287. #endif
  288. }
  289. #endif
  290. ServerSendQueuePushEnd((const uchar*) tnps, tnps->size);
  291. return true;
  292. }
  293. bool TCPConnection::Send(const uchar* data, sint32 size) {
  294. if (!Connected())
  295. return false;
  296. if (GetMode() != modeConsole)
  297. return false;
  298. if (!size)
  299. return true;
  300. ServerSendQueuePushEnd(data, size);
  301. return true;
  302. }
  303. void TCPConnection::InModeQueuePush(TCPNetPacket_Struct* tnps) {
  304. MSendQueue.lock();
  305. InModeQueue.push(tnps);
  306. MSendQueue.unlock();
  307. }
  308. void TCPConnection::ServerSendQueuePushEnd(const uchar* data, sint32 size) {
  309. MSendQueue.lock();
  310. if (sendbuf == 0) {
  311. sendbuf = new uchar[size];
  312. sendbuf_size = size;
  313. sendbuf_used = 0;
  314. }
  315. else if (size > (sendbuf_size - sendbuf_used)) {
  316. sendbuf_size += size + 1024;
  317. uchar* tmp = new uchar[sendbuf_size];
  318. memcpy(tmp, sendbuf, sendbuf_used);
  319. safe_delete_array(sendbuf);
  320. sendbuf = tmp;
  321. }
  322. memcpy(&sendbuf[sendbuf_used], data, size);
  323. sendbuf_used += size;
  324. MSendQueue.unlock();
  325. }
  326. void TCPConnection::ServerSendQueuePushEnd(uchar** data, sint32 size) {
  327. MSendQueue.lock();
  328. if (sendbuf == 0) {
  329. sendbuf = *data;
  330. sendbuf_size = size;
  331. sendbuf_used = size;
  332. MSendQueue.unlock();
  333. *data = 0;
  334. return;
  335. }
  336. if (size > (sendbuf_size - sendbuf_used)) {
  337. sendbuf_size += size;
  338. uchar* tmp = new uchar[sendbuf_size];
  339. memcpy(tmp, sendbuf, sendbuf_used);
  340. safe_delete_array(sendbuf);
  341. sendbuf = tmp;
  342. }
  343. memcpy(&sendbuf[sendbuf_used], *data, size);
  344. sendbuf_used += size;
  345. MSendQueue.unlock();
  346. delete[] (TCPNetPacket_Struct*)*data;
  347. }
  348. void TCPConnection::ServerSendQueuePushFront(uchar* data, sint32 size) {
  349. MSendQueue.lock();
  350. if (sendbuf == 0) {
  351. sendbuf = new uchar[size];
  352. sendbuf_size = size;
  353. sendbuf_used = 0;
  354. }
  355. else if (size > (sendbuf_size - sendbuf_used)) {
  356. sendbuf_size += size;
  357. uchar* tmp = new uchar[sendbuf_size];
  358. memcpy(&tmp[size], sendbuf, sendbuf_used);
  359. safe_delete_array(sendbuf);
  360. sendbuf = tmp;
  361. }
  362. memcpy(sendbuf, data, size);
  363. sendbuf_used += size;
  364. MSendQueue.unlock();
  365. }
  366. bool TCPConnection::ServerSendQueuePop(uchar** data, sint32* size) {
  367. bool ret;
  368. if (!MSendQueue.trylock())
  369. return false;
  370. if (sendbuf) {
  371. *data = sendbuf;
  372. *size = sendbuf_used;
  373. sendbuf = 0;
  374. ret = true;
  375. }
  376. else {
  377. ret = false;
  378. }
  379. MSendQueue.unlock();
  380. return ret;
  381. }
  382. ServerPacket* TCPConnection::PopPacket() {
  383. ServerPacket* ret;
  384. if (!MOutQueueLock.trylock())
  385. return 0;
  386. ret = OutQueue.pop();
  387. MOutQueueLock.unlock();
  388. return ret;
  389. }
  390. char* TCPConnection::PopLine() {
  391. char* ret;
  392. if (!MOutQueueLock.trylock())
  393. return 0;
  394. ret = (char*) LineOutQueue.pop();
  395. MOutQueueLock.unlock();
  396. return ret;
  397. }
  398. void TCPConnection::OutQueuePush(ServerPacket* pack) {
  399. MOutQueueLock.lock();
  400. OutQueue.push(pack);
  401. MOutQueueLock.unlock();
  402. }
  403. void TCPConnection::LineOutQueuePush(char* line) {
  404. #if defined(GOTFRAGS) && 0
  405. if (strcmp(line, "**CRASHME**") == 0) {
  406. int i = 0;
  407. cout << (5 / i) << endl;
  408. }
  409. #endif
  410. if (strcmp(line, "**PACKETMODE**") == 0) {
  411. MSendQueue.lock();
  412. safe_delete_array(sendbuf);
  413. if (TCPMode == modeConsole)
  414. Send((const uchar*) "\0**PACKETMODE**\r", 16);
  415. TCPMode = modePacket;
  416. TCPNetPacket_Struct* tnps = 0;
  417. while ((tnps = InModeQueue.pop())) {
  418. SendPacket(tnps);
  419. safe_delete_array(tnps);
  420. }
  421. MSendQueue.unlock();
  422. safe_delete_array(line);
  423. return;
  424. }
  425. MOutQueueLock.lock();
  426. LineOutQueue.push(line);
  427. MOutQueueLock.unlock();
  428. }
  429. void TCPConnection::Disconnect(bool iSendRelayDisconnect) {
  430. if (connection_socket != INVALID_SOCKET && connection_socket != 0) {
  431. MState.lock();
  432. if (pState == TCPS_Connected || pState == TCPS_Disconnecting || pState == TCPS_Disconnected)
  433. SendData();
  434. pState = TCPS_Closing;
  435. MState.unlock();
  436. shutdown(connection_socket, 0x01);
  437. shutdown(connection_socket, 0x00);
  438. #ifdef WIN32
  439. closesocket(connection_socket);
  440. #else
  441. close(connection_socket);
  442. #endif
  443. connection_socket = 0;
  444. rIP = 0;
  445. rPort = 0;
  446. ClearBuffers();
  447. }
  448. SetState(TCPS_Ready);
  449. if (RelayLink) {
  450. RelayLink->RemoveRelay(this, iSendRelayDisconnect);
  451. RelayLink = 0;
  452. }
  453. }
  454. bool TCPConnection::GetAsyncConnect() {
  455. bool ret;
  456. MAsyncConnect.lock();
  457. ret = pAsyncConnect;
  458. MAsyncConnect.unlock();
  459. return ret;
  460. }
  461. bool TCPConnection::SetAsyncConnect(bool iValue) {
  462. bool ret;
  463. MAsyncConnect.lock();
  464. ret = pAsyncConnect;
  465. pAsyncConnect = iValue;
  466. MAsyncConnect.unlock();
  467. return ret;
  468. }
  469. void TCPConnection::AsyncConnect(char* irAddress, int16 irPort) {
  470. if (ConnectionType != Outgoing) {
  471. // If this code runs, we got serious problems
  472. // Crash and burn.
  473. ThrowError("TCPConnection::AsyncConnect() call on a Incomming connection object!");
  474. return;
  475. }
  476. if (GetState() != TCPS_Ready)
  477. return;
  478. MAsyncConnect.lock();
  479. if (pAsyncConnect) {
  480. MAsyncConnect.unlock();
  481. return;
  482. }
  483. pAsyncConnect = true;
  484. safe_delete_array(charAsyncConnect);
  485. charAsyncConnect = new char[strlen(irAddress) + 1];
  486. strcpy(charAsyncConnect, irAddress);
  487. rPort = irPort;
  488. MAsyncConnect.unlock();
  489. if (!pRunLoop) {
  490. pRunLoop = true;
  491. #ifdef WIN32
  492. _beginthread(TCPConnectionLoop, 0, this);
  493. #else
  494. pthread_t thread;
  495. pthread_create(&thread, NULL, TCPConnectionLoop, this);
  496. pthread_detach(thread);
  497. #endif
  498. }
  499. return;
  500. }
  501. void TCPConnection::AsyncConnect(int32 irIP, int16 irPort) {
  502. if (ConnectionType != Outgoing) {
  503. // If this code runs, we got serious problems
  504. // Crash and burn.
  505. ThrowError("TCPConnection::AsyncConnect() call on a Incomming connection object!");
  506. return;
  507. }
  508. if (GetState() != TCPS_Ready)
  509. return;
  510. MAsyncConnect.lock();
  511. if (pAsyncConnect) {
  512. MAsyncConnect.unlock();
  513. return;
  514. }
  515. pAsyncConnect = true;
  516. safe_delete(charAsyncConnect);
  517. rIP = irIP;
  518. rPort = irPort;
  519. MAsyncConnect.unlock();
  520. if (!pRunLoop) {
  521. pRunLoop = true;
  522. #ifdef WIN32
  523. _beginthread(TCPConnectionLoop, 0, this);
  524. #else
  525. pthread_t thread;
  526. pthread_create(&thread, NULL, TCPConnectionLoop, this);
  527. pthread_detach(thread);
  528. #endif
  529. }
  530. return;
  531. }
  532. bool TCPConnection::Connect(char* irAddress, int16 irPort, char* errbuf) {
  533. if (errbuf)
  534. errbuf[0] = 0;
  535. int32 tmpIP = ResolveIP(irAddress);
  536. if (!tmpIP) {
  537. if (errbuf) {
  538. #ifdef WIN32
  539. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Couldnt resolve hostname. Error: %i", WSAGetLastError());
  540. #else
  541. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Couldnt resolve hostname. Error #%i: %s", errno, strerror(errno));
  542. #endif
  543. }
  544. return false;
  545. }
  546. return Connect(tmpIP, irPort, errbuf);
  547. }
  548. bool TCPConnection::Connect(int32 in_ip, int16 in_port, char* errbuf) {
  549. if (errbuf)
  550. errbuf[0] = 0;
  551. if (ConnectionType != Outgoing) {
  552. // If this code runs, we got serious problems
  553. // Crash and burn.
  554. ThrowError("TCPConnection::Connect() call on a Incomming connection object!");
  555. return false;
  556. }
  557. MState.lock();
  558. if (pState == TCPS_Ready) {
  559. pState = TCPS_Connecting;
  560. }
  561. else {
  562. MState.unlock();
  563. SetAsyncConnect(false);
  564. return false;
  565. }
  566. MState.unlock();
  567. if (!pRunLoop) {
  568. pRunLoop = true;
  569. #ifdef WIN32
  570. _beginthread(TCPConnectionLoop, 0, this);
  571. #else
  572. pthread_t thread;
  573. pthread_create(&thread, NULL, TCPConnectionLoop, this);
  574. pthread_detach(thread);
  575. #endif
  576. }
  577. connection_socket = INVALID_SOCKET;
  578. struct sockaddr_in server_sin;
  579. // struct in_addr in;
  580. if ((connection_socket = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET || connection_socket == 0) {
  581. #ifdef WIN32
  582. if (errbuf)
  583. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Allocating socket failed. Error: %i", WSAGetLastError());
  584. #else
  585. if (errbuf)
  586. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Allocating socket failed. Error: %s", strerror(errno));
  587. #endif
  588. SetState(TCPS_Ready);
  589. SetAsyncConnect(false);
  590. return false;
  591. }
  592. server_sin.sin_family = AF_INET;
  593. server_sin.sin_addr.s_addr = in_ip;
  594. server_sin.sin_port = htons(in_port);
  595. // Establish a connection to the server socket.
  596. #ifdef WIN32
  597. if (connect(connection_socket, (PSOCKADDR) &server_sin, sizeof (server_sin)) == SOCKET_ERROR) {
  598. if (errbuf)
  599. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): connect() failed. Error: %i", WSAGetLastError());
  600. closesocket(connection_socket);
  601. connection_socket = 0;
  602. SetState(TCPS_Ready);
  603. SetAsyncConnect(false);
  604. return false;
  605. }
  606. #else
  607. if (connect(connection_socket, (struct sockaddr *) &server_sin, sizeof (server_sin)) == SOCKET_ERROR) {
  608. if (errbuf)
  609. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): connect() failed. Error: %s", strerror(errno));
  610. close(connection_socket);
  611. connection_socket = 0;
  612. SetState(TCPS_Ready);
  613. SetAsyncConnect(false);
  614. return false;
  615. }
  616. #endif
  617. int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k
  618. setsockopt(connection_socket, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize));
  619. #ifdef WIN32
  620. unsigned long nonblocking = 1;
  621. ioctlsocket(connection_socket, FIONBIO, &nonblocking);
  622. #else
  623. fcntl(connection_socket, F_SETFL, O_NONBLOCK);
  624. #endif
  625. SetEcho(false);
  626. MSendQueue.lock();
  627. ClearBuffers();
  628. TCPMode = modePacket;
  629. MSendQueue.unlock();
  630. rIP = in_ip;
  631. rPort = in_port;
  632. SetState(TCPS_Connected);
  633. SetAsyncConnect(false);
  634. return true;
  635. }
  636. void TCPConnection::ClearBuffers() {
  637. LockMutex lock1(&MSendQueue);
  638. LockMutex lock2(&MOutQueueLock);
  639. LockMutex lock3(&MRunLoop);
  640. LockMutex lock4(&MState);
  641. safe_delete_array(recvbuf);
  642. safe_delete_array(sendbuf);
  643. ServerPacket* pack = 0;
  644. while ((pack = PopPacket()))
  645. safe_delete(pack);
  646. TCPNetPacket_Struct* tnps = 0;
  647. while ((tnps = InModeQueue.pop()))
  648. safe_delete(tnps);
  649. char* line = 0;
  650. while ((line = LineOutQueue.pop()))
  651. safe_delete_array(line);
  652. keepalive_timer->Start();
  653. timeout_timer->Start();
  654. }
  655. bool TCPConnection::CheckNetActive() {
  656. MState.lock();
  657. if (pState == TCPS_Connected || pState == TCPS_Disconnecting) {
  658. MState.unlock();
  659. return true;
  660. }
  661. MState.unlock();
  662. return false;
  663. }
  664. bool TCPConnection::Process() {
  665. char errbuf[TCPConnection_ErrorBufferSize];
  666. if (!CheckNetActive()) {
  667. if (ConnectionType == Outgoing) {
  668. if (GetAsyncConnect()) {
  669. if (charAsyncConnect)
  670. rIP = ResolveIP(charAsyncConnect);
  671. Connect(rIP, rPort);
  672. }
  673. }
  674. if (GetState() == TCPS_Disconnected) {
  675. Disconnect();
  676. return false;
  677. }
  678. else if (GetState() == TCPS_Connecting)
  679. return true;
  680. else
  681. return false;
  682. }
  683. if (!SendData(errbuf)) {
  684. struct in_addr in;
  685. in.s_addr = GetrIP();
  686. cout << inet_ntoa(in) << ":" << GetrPort() << ": " << errbuf << endl;
  687. return false;
  688. }
  689. if (!Connected())
  690. return false;
  691. if (!RecvData(errbuf)) {
  692. struct in_addr in;
  693. in.s_addr = GetrIP();
  694. cout << inet_ntoa(in) << ":" << GetrPort() << ": " << errbuf << endl;
  695. return false;
  696. }
  697. return true;
  698. }
  699. bool TCPConnection::RecvData(char* errbuf) {
  700. if (errbuf)
  701. errbuf[0] = 0;
  702. if (!Connected()) {
  703. return false;
  704. }
  705. int status = 0;
  706. if (recvbuf == 0) {
  707. recvbuf = new uchar[5120];
  708. recvbuf_size = 5120;
  709. recvbuf_used = 0;
  710. recvbuf_echo = 0;
  711. }
  712. else if ((recvbuf_size - recvbuf_used) < 2048) {
  713. uchar* tmpbuf = new uchar[recvbuf_size + 5120];
  714. memcpy(tmpbuf, recvbuf, recvbuf_used);
  715. recvbuf_size += 5120;
  716. safe_delete_array(recvbuf);
  717. recvbuf = tmpbuf;
  718. if (recvbuf_size >= MaxTCPReceiveBufferSize) {
  719. if (errbuf)
  720. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): recvbuf_size >= MaxTCPReceiveBufferSize");
  721. return false;
  722. }
  723. }
  724. status = recv(connection_socket, (char *) &recvbuf[recvbuf_used], (recvbuf_size - recvbuf_used), 0);
  725. if (status >= 1) {
  726. #if TCPN_LOG_RAW_DATA_IN >= 1
  727. struct in_addr in;
  728. in.s_addr = GetrIP();
  729. CoutTimestamp(true);
  730. cout << ": Read " << status << " bytes from network. (recvbuf_used = " << recvbuf_used << ") " << inet_ntoa(in) << ":" << GetrPort();
  731. if (pOldFormat)
  732. cout << " (OldFormat)";
  733. cout << endl;
  734. #if TCPN_LOG_RAW_DATA_IN == 2
  735. sint32 tmp = status;
  736. if (tmp > 32)
  737. tmp = 32;
  738. DumpPacket(&recvbuf[recvbuf_used], status);
  739. #elif TCPN_LOG_RAW_DATA_IN >= 3
  740. DumpPacket(&recvbuf[recvbuf_used], status);
  741. #endif
  742. #endif
  743. recvbuf_used += status;
  744. timeout_timer->Start();
  745. if (!ProcessReceivedData(errbuf))
  746. return false;
  747. }
  748. else if (status == SOCKET_ERROR) {
  749. #ifdef WIN32
  750. if (!(WSAGetLastError() == WSAEWOULDBLOCK)) {
  751. if (errbuf)
  752. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Error: %i", WSAGetLastError());
  753. return false;
  754. }
  755. #else
  756. if (!(errno == EWOULDBLOCK)) {
  757. if (errbuf)
  758. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Error: %s", strerror(errno));
  759. return false;
  760. }
  761. #endif
  762. }
  763. if ((TCPMode == modePacket || TCPMode == modeTransition) && timeout_timer->Check()) {
  764. if (errbuf)
  765. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Connection timeout");
  766. return false;
  767. }
  768. return true;
  769. }
  770. bool TCPConnection::GetEcho() {
  771. bool ret;
  772. MEcho.lock();
  773. ret = pEcho;
  774. MEcho.unlock();
  775. return ret;
  776. }
  777. void TCPConnection::SetEcho(bool iValue) {
  778. MEcho.lock();
  779. pEcho = iValue;
  780. MEcho.unlock();
  781. }
  782. bool TCPConnection::ProcessReceivedData(char* errbuf) {
  783. if (errbuf)
  784. errbuf[0] = 0;
  785. if (!recvbuf)
  786. return true;
  787. if (TCPMode == modePacket) {
  788. //if (pOldFormat)
  789. // return ProcessReceivedDataAsOldPackets(errbuf);
  790. //else
  791. return ProcessReceivedDataAsPackets(errbuf);
  792. }
  793. else {
  794. #if TCPN_DEBUG_Console >= 4
  795. if (recvbuf_used) {
  796. cout << "Starting Processing: recvbuf=" << recvbuf_used << endl;
  797. DumpPacket(recvbuf, recvbuf_used);
  798. }
  799. #endif
  800. for (int i=0; i < recvbuf_used; i++) {
  801. if (GetEcho() && i >= recvbuf_echo) {
  802. Send(&recvbuf[i], 1);
  803. recvbuf_echo = i + 1;
  804. }
  805. switch(recvbuf[i]) {
  806. case 0: { // 0 is the code for clear buffer
  807. if (i==0) {
  808. recvbuf_used--;
  809. recvbuf_echo--;
  810. memcpy(recvbuf, &recvbuf[1], recvbuf_used);
  811. i = -1;
  812. } else {
  813. if (i == recvbuf_used) {
  814. safe_delete_array(recvbuf);
  815. i = -1;
  816. }
  817. else {
  818. uchar* tmpdel = recvbuf;
  819. recvbuf = new uchar[recvbuf_size];
  820. memcpy(recvbuf, &tmpdel[i+1], recvbuf_used-i);
  821. recvbuf_used -= i + 1;
  822. recvbuf_echo -= i + 1;
  823. safe_delete(tmpdel);
  824. i = -1;
  825. }
  826. }
  827. #if TCPN_DEBUG_Console >= 5
  828. cout << "Removed 0x00" << endl;
  829. if (recvbuf_used) {
  830. cout << "recvbuf left: " << recvbuf_used << endl;
  831. DumpPacket(recvbuf, recvbuf_used);
  832. }
  833. else
  834. cout << "recbuf left: None" << endl;
  835. #endif
  836. break;
  837. }
  838. case 10:
  839. case 13: // newline marker
  840. {
  841. if (i==0) { // empty line
  842. recvbuf_used--;
  843. recvbuf_echo--;
  844. memcpy(recvbuf, &recvbuf[1], recvbuf_used);
  845. i = -1;
  846. } else {
  847. char* line = new char[i+1];
  848. memset(line, 0, i+1);
  849. memcpy(line, recvbuf, i);
  850. #if TCPN_DEBUG_Console >= 3
  851. cout << "Line Out: " << endl;
  852. DumpPacket((uchar*) line, i);
  853. #endif
  854. //line[i] = 0;
  855. uchar* tmpdel = recvbuf;
  856. recvbuf = new uchar[recvbuf_size];
  857. recvbuf_used -= i+1;
  858. recvbuf_echo -= i+1;
  859. memcpy(recvbuf, &tmpdel[i+1], recvbuf_used);
  860. #if TCPN_DEBUG_Console >= 5
  861. cout << "i+1=" << i+1 << endl;
  862. if (recvbuf_used) {
  863. cout << "recvbuf left: " << recvbuf_used << endl;
  864. DumpPacket(recvbuf, recvbuf_used);
  865. }
  866. else
  867. cout << "recbuf left: None" << endl;
  868. #endif
  869. safe_delete(tmpdel);
  870. if (strlen(line) > 0)
  871. LineOutQueuePush(line);
  872. else
  873. safe_delete_array(line);
  874. if (TCPMode == modePacket) {
  875. return ProcessReceivedDataAsPackets(errbuf);
  876. }
  877. i = -1;
  878. }
  879. break;
  880. }
  881. case 8: // backspace
  882. {
  883. if (i==0) { // nothin to backspace
  884. recvbuf_used--;
  885. recvbuf_echo--;
  886. memcpy(recvbuf, &recvbuf[1], recvbuf_used);
  887. i = -1;
  888. } else {
  889. uchar* tmpdel = recvbuf;
  890. recvbuf = new uchar[recvbuf_size];
  891. memcpy(recvbuf, tmpdel, i-1);
  892. memcpy(&recvbuf[i-1], &tmpdel[i+1], recvbuf_used-i);
  893. recvbuf_used -= 2;
  894. recvbuf_echo -= 2;
  895. safe_delete(tmpdel);
  896. i -= 2;
  897. }
  898. break;
  899. }
  900. }
  901. }
  902. if (recvbuf_used < 0)
  903. safe_delete_array(recvbuf);
  904. }
  905. return true;
  906. }
  907. bool TCPConnection::ProcessReceivedDataAsPackets(char* errbuf) {
  908. if (errbuf)
  909. errbuf[0] = 0;
  910. sint32 base = 0;
  911. sint32 size = 0;
  912. uchar* buffer;
  913. sint32 sizeReq = sizeof(TCPNetPacket_Struct);
  914. ServerPacket* pack = 0;
  915. while ((recvbuf_used - base) >= size) {
  916. TCPNetPacket_Struct* tnps = (TCPNetPacket_Struct*) &recvbuf[base];
  917. buffer = tnps->buffer;
  918. size = tnps->size;
  919. if (size < sizeReq || recvbuf_used < sizeReq || size >= MaxTCPReceiveBufferSize) {
  920. #if TCPN_DEBUG_Memory >= 1
  921. cout << "TCPConnection[" << GetID() << "]::ProcessReceivedDataAsPackets(): size[" << size << "] >= MaxTCPReceiveBufferSize" << endl;
  922. #endif
  923. if (errbuf)
  924. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(): size provided %i, recvbuf_used %i, checks failed: struct_size < %i || recvbuf_used < sizeReq || size >= MaxTCPReceiveBufferSize", size, recvbuf_used, sizeReq);
  925. return false;
  926. }
  927. if ((recvbuf_used - base) >= size) {
  928. // ok, we got enough data to make this packet!
  929. safe_delete(pack);
  930. pack = new ServerPacket;
  931. pack->size = size - sizeof(TCPNetPacket_Struct);
  932. // read headers
  933. pack->opcode = tnps->opcode;
  934. if (tnps->flags.compressed) {
  935. sizeReq += 4;
  936. if(size < sizeReq || recvbuf_used < sizeReq)
  937. {
  938. if (errbuf)
  939. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(Flags.Compressed): size provided %i, recvbuf_used %i, checks failed: struct_size < %i || recvbuf_used < sizeReq", size, recvbuf_used, sizeReq);
  940. safe_delete(pack);
  941. return false;
  942. }
  943. pack->compressed = true;
  944. pack->InflatedSize = *((sint32*)buffer);
  945. pack->size -= 4;
  946. buffer += 4;
  947. }
  948. if (tnps->flags.destination) {
  949. sizeReq += 4;
  950. if(size < sizeReq || recvbuf_used < sizeReq)
  951. {
  952. if (errbuf)
  953. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(Flags.Destination): size provided %i, recvbuf_used %i, checks failed: struct_size < %i || recvbuf_used < sizeReq", size, recvbuf_used, sizeReq);
  954. safe_delete(pack);
  955. return false;
  956. }
  957. pack->destination = *((sint32*)buffer);
  958. pack->size -= 4;
  959. buffer += 4;
  960. }
  961. // end read headers
  962. if (pack->size > 0) {
  963. if (tnps->flags.compressed) {
  964. // Lets decompress the packet here
  965. pack->compressed = false;
  966. if(pack->InflatedSize < MaxTCPReceiveBufferSize)
  967. {
  968. pack->pBuffer = new uchar[pack->InflatedSize];
  969. pack->size = InflatePacket(buffer, pack->size, pack->pBuffer, pack->InflatedSize);
  970. if(!pack->size)
  971. {
  972. if (errbuf)
  973. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(InflatePacket): size provided %i, recvbuf_used: %i, sizeReq: %i, could not inflate packet", size, recvbuf_used, sizeReq);
  974. safe_delete(pack);
  975. return false;
  976. }
  977. }
  978. else
  979. {
  980. cout << "Invalid inflated packet." << endl;
  981. safe_delete(pack);
  982. return false;
  983. }
  984. }
  985. else {
  986. pack->pBuffer = new uchar[pack->size];
  987. memcpy(pack->pBuffer, buffer, pack->size);
  988. }
  989. }
  990. if (pack->opcode == 0) {
  991. if (pack->size) {
  992. #if TCPN_DEBUG >= 2
  993. cout << "Received TCP Network layer packet" << endl;
  994. #endif
  995. ProcessNetworkLayerPacket(pack);
  996. }
  997. #if TCPN_DEBUG >= 5
  998. else {
  999. cout << "Received TCP keepalive packet. (opcode=0)" << endl;
  1000. }
  1001. #endif
  1002. }
  1003. else {
  1004. #if TCPN_LOG_PACKETS >= 1
  1005. if (pack && pack->opcode != 0) {
  1006. struct in_addr in;
  1007. in.s_addr = GetrIP();
  1008. CoutTimestamp(true);
  1009. cout << ": Logging incoming TCP packet. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl;
  1010. #if TCPN_LOG_PACKETS == 2
  1011. if (pack->size >= 32)
  1012. DumpPacket(pack->pBuffer, 32);
  1013. else
  1014. DumpPacket(pack);
  1015. #endif
  1016. #if TCPN_LOG_PACKETS >= 3
  1017. DumpPacket(pack);
  1018. #endif
  1019. }
  1020. #endif
  1021. if (RelayServer && Server && pack->destination) {
  1022. TCPConnection* con = Server->GetConnection(pack->destination);
  1023. if (!con) {
  1024. #if TCPN_DEBUG >= 1
  1025. cout << "Error relaying packet: con = 0" << endl;
  1026. #endif
  1027. }
  1028. else{
  1029. con->OutQueuePush(pack);
  1030. pack = 0;
  1031. }
  1032. }
  1033. else{
  1034. OutQueuePush(pack);
  1035. pack = 0;
  1036. }
  1037. }
  1038. base += size;
  1039. size = 7;
  1040. }
  1041. }
  1042. safe_delete(pack);
  1043. if (base != 0) {
  1044. if (base >= recvbuf_used) {
  1045. safe_delete_array(recvbuf);
  1046. }
  1047. else {
  1048. uchar* tmpbuf = new uchar[recvbuf_size - base];
  1049. memcpy(tmpbuf, &recvbuf[base], recvbuf_used - base);
  1050. safe_delete_array(recvbuf);
  1051. recvbuf = tmpbuf;
  1052. recvbuf_used -= base;
  1053. recvbuf_size -= base;
  1054. }
  1055. }
  1056. return true;
  1057. }
  1058. bool TCPConnection::ProcessReceivedDataAsOldPackets(char* errbuf) {
  1059. sint32 base = 0;
  1060. sint32 size = 4;
  1061. uchar* buffer;
  1062. ServerPacket* pack = 0;
  1063. while ((recvbuf_used - base) >= size) {
  1064. buffer = &recvbuf[base];
  1065. memcpy(&size, &buffer[2], 2);
  1066. if (size >= MaxTCPReceiveBufferSize) {
  1067. #if TCPN_DEBUG_Memory >= 1
  1068. cout << "TCPConnection[" << GetID() << "]::ProcessReceivedDataAsPackets(): size[" << size << "] >= MaxTCPReceiveBufferSize" << endl;
  1069. #endif
  1070. if (errbuf)
  1071. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(): size >= MaxTCPReceiveBufferSize");
  1072. return false;
  1073. }
  1074. if ((recvbuf_used - base) >= size) {
  1075. // ok, we got enough data to make this packet!
  1076. pack = new ServerPacket;
  1077. memcpy(&pack->opcode, &buffer[0], 2);
  1078. pack->size = size - 4;
  1079. LogWrite(MISC__TODO, 1, "TODO", "Checksum or size check or something similar\n\t(%s, function: %s, line #: %i)", __FILE__, __FUNCTION__, __LINE__);
  1080. /*
  1081. if () { // TODO: Checksum or size check or something similar
  1082. // Datastream corruption, get the hell outta here!
  1083. delete pack;
  1084. return false;
  1085. }
  1086. */
  1087. if (pack->size > 0) {
  1088. pack->pBuffer = new uchar[pack->size];
  1089. memcpy(pack->pBuffer, &buffer[4], pack->size);
  1090. }
  1091. if (pack->opcode == 0) {
  1092. // keepalive, no need to process
  1093. safe_delete(pack);
  1094. }
  1095. else {
  1096. #if TCPN_LOG_PACKETS >= 1
  1097. if (pack && pack->opcode != 0) {
  1098. struct in_addr in;
  1099. in.s_addr = GetrIP();
  1100. CoutTimestamp(true);
  1101. cout << ": Logging incoming TCP OldPacket. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl;
  1102. #if TCPN_LOG_PACKETS == 2
  1103. if (pack->size >= 32)
  1104. DumpPacket(pack->pBuffer, 32);
  1105. else
  1106. DumpPacket(pack);
  1107. #endif
  1108. #if TCPN_LOG_PACKETS >= 3
  1109. DumpPacket(pack);
  1110. #endif
  1111. }
  1112. #endif
  1113. OutQueuePush(pack);
  1114. }
  1115. base += size;
  1116. size = 4;
  1117. }
  1118. }
  1119. if (base != 0) {
  1120. if (base >= recvbuf_used) {
  1121. safe_delete_array(recvbuf);
  1122. }
  1123. else {
  1124. uchar* tmpbuf = new uchar[recvbuf_size - base];
  1125. memcpy(tmpbuf, &recvbuf[base], recvbuf_used - base);
  1126. safe_delete_array(recvbuf);
  1127. recvbuf = tmpbuf;
  1128. recvbuf_used -= base;
  1129. recvbuf_size -= base;
  1130. }
  1131. }
  1132. return true;
  1133. }
  1134. void TCPConnection::ProcessNetworkLayerPacket(ServerPacket* pack) {
  1135. int8 opcode = pack->pBuffer[0];
  1136. /** disabling RELAY capabilities, this functionality is poorly implemented
  1137. even if such a feature needs to be re-used need authentication BEFORE allowing relay to take place
  1138. secondly we need to protect the LS accepting new connections as bogus data can be passed to open
  1139. fake TCP connections
  1140. opcode 0 is OK, that is Keep-Alive
  1141. **/
  1142. if (opcode > 0)
  1143. {
  1144. Disconnect();
  1145. return;
  1146. }
  1147. int8* data = &pack->pBuffer[1];
  1148. switch (opcode) {
  1149. case 0: {
  1150. break;
  1151. }
  1152. case 1: { // Switch to RelayServer mode
  1153. if (pack->size != 1) {
  1154. SendNetErrorPacket("New RelayClient: wrong size, expected 1");
  1155. break;
  1156. }
  1157. if (RelayServer) {
  1158. SendNetErrorPacket("Switch to RelayServer mode when already in RelayServer mode");
  1159. break;
  1160. }
  1161. if (RemoteID) {
  1162. SendNetErrorPacket("Switch to RelayServer mode by a Relay Client");
  1163. break;
  1164. }
  1165. if (ConnectionType != Incomming) {
  1166. SendNetErrorPacket("Switch to RelayServer mode on outgoing connection");
  1167. break;
  1168. }
  1169. #if TCPC_DEBUG >= 3
  1170. struct in_addr in;
  1171. in.s_addr = GetrIP();
  1172. cout << "Switching to RelayServer mode: " << inet_ntoa(in) << ":" << GetPort() << endl;
  1173. #endif
  1174. RelayServer = true;
  1175. break;
  1176. }
  1177. case 2: { // New Relay Client
  1178. if (!RelayServer) {
  1179. SendNetErrorPacket("New RelayClient when not in RelayServer mode");
  1180. break;
  1181. }
  1182. if (pack->size != 11) {
  1183. SendNetErrorPacket("New RelayClient: wrong size, expected 11");
  1184. break;
  1185. }
  1186. if (ConnectionType != Incomming) {
  1187. SendNetErrorPacket("New RelayClient: illegal on outgoing connection");
  1188. break;
  1189. }
  1190. TCPConnection* con = new TCPConnection(Server, this, *((int32*) data), *((int32*) &data[4]), *((int16*) &data[8]));
  1191. Server->AddConnection(con);
  1192. RelayCount++;
  1193. break;
  1194. }
  1195. case 3: { // Delete Relay Client
  1196. if (!RelayServer) {
  1197. SendNetErrorPacket("Delete RelayClient when not in RelayServer mode");
  1198. break;
  1199. }
  1200. if (pack->size != 5) {
  1201. SendNetErrorPacket("Delete RelayClient: wrong size, expected 5");
  1202. break;
  1203. }
  1204. TCPConnection* con = Server->GetConnection(*((int32*)data));
  1205. if (con) {
  1206. if (ConnectionType == Incomming) {
  1207. if (con->GetRelayLink() != this) {
  1208. SendNetErrorPacket("Delete RelayClient: RelayLink != this");
  1209. break;
  1210. }
  1211. }
  1212. con->Disconnect(false);
  1213. }
  1214. break;
  1215. }
  1216. case 255: {
  1217. #if TCPC_DEBUG >= 1
  1218. struct in_addr in;
  1219. in.s_addr = GetrIP();
  1220. cout "Received NetError: '";
  1221. if (pack->size > 1)
  1222. cout << (char*) data;
  1223. cout << "': " << inet_ntoa(in) << ":" << GetPort() << endl;
  1224. #endif
  1225. break;
  1226. }
  1227. }
  1228. }
  1229. void TCPConnection::SendNetErrorPacket(const char* reason) {
  1230. #if TCPC_DEBUG >= 1
  1231. struct in_addr in;
  1232. in.s_addr = GetrIP();
  1233. cout "NetError: '";
  1234. if (reason)
  1235. cout << reason;
  1236. cout << "': " << inet_ntoa(in) << ":" << GetPort() << endl;
  1237. #endif
  1238. ServerPacket* pack = new ServerPacket(0);
  1239. pack->size = 1;
  1240. if (reason)
  1241. pack->size += strlen(reason) + 1;
  1242. pack->pBuffer = new uchar[pack->size];
  1243. memset(pack->pBuffer, 0, pack->size);
  1244. pack->pBuffer[0] = 255;
  1245. strcpy((char*) &pack->pBuffer[1], reason);
  1246. SendPacket(pack);
  1247. safe_delete(pack);
  1248. }
  1249. void TCPConnection::RemoveRelay(TCPConnection* relay, bool iSendRelayDisconnect) {
  1250. if (iSendRelayDisconnect) {
  1251. ServerPacket* pack = new ServerPacket(0, 5);
  1252. pack->pBuffer[0] = 3;
  1253. *((int32*) &pack->pBuffer[1]) = relay->GetRemoteID();
  1254. SendPacket(pack);
  1255. safe_delete(pack);
  1256. }
  1257. RelayCount--;
  1258. }
  1259. bool TCPConnection::SendData(char* errbuf) {
  1260. if (errbuf)
  1261. errbuf[0] = 0;
  1262. /************ Get first send packet on queue and send it! ************/
  1263. uchar* data = 0;
  1264. sint32 size = 0;
  1265. int status = 0;
  1266. if (ServerSendQueuePop(&data, &size)) {
  1267. #ifdef WIN32
  1268. status = send(connection_socket, (const char *) data, size, 0);
  1269. #else
  1270. status = send(connection_socket, data, size, MSG_NOSIGNAL);
  1271. if(errno==EPIPE) status = SOCKET_ERROR;
  1272. #endif
  1273. if (status >= 1) {
  1274. #if TCPN_LOG_RAW_DATA_OUT >= 1
  1275. struct in_addr in;
  1276. in.s_addr = GetrIP();
  1277. CoutTimestamp(true);
  1278. cout << ": Wrote " << status << " bytes to network. " << inet_ntoa(in) << ":" << GetrPort();
  1279. if (pOldFormat)
  1280. cout << " (OldFormat)";
  1281. cout << endl;
  1282. #if TCPN_LOG_RAW_DATA_OUT == 2
  1283. sint32 tmp = status;
  1284. if (tmp > 32)
  1285. tmp = 32;
  1286. DumpPacket(data, status);
  1287. #elif TCPN_LOG_RAW_DATA_OUT >= 3
  1288. DumpPacket(data, status);
  1289. #endif
  1290. #endif
  1291. keepalive_timer->Start();
  1292. if (status < (signed)size) {
  1293. #if TCPN_LOG_RAW_DATA_OUT >= 1
  1294. struct in_addr in;
  1295. in.s_addr = GetrIP();
  1296. CoutTimestamp(true);
  1297. cout << ": Pushed " << (size - status) << " bytes back onto the send queue. " << inet_ntoa(in) << ":" << GetrPort();
  1298. if (pOldFormat)
  1299. cout << " (OldFormat)";
  1300. cout << endl;
  1301. #endif
  1302. // If there's network congestion, the number of bytes sent can be less than
  1303. // what we tried to give it... Push the extra back on the queue for later
  1304. ServerSendQueuePushFront(&data[status], size - status);
  1305. }
  1306. else if (status > (signed)size) {
  1307. ThrowError("TCPConnection::SendData(): WTF! status > size");
  1308. return false;
  1309. }
  1310. // else if (status == size) {}
  1311. }
  1312. else {
  1313. ServerSendQueuePushFront(data, size);
  1314. }
  1315. safe_delete_array(data);
  1316. if (status == SOCKET_ERROR) {
  1317. #ifdef WIN32
  1318. if (WSAGetLastError() != WSAEWOULDBLOCK)
  1319. #else
  1320. if (errno != EWOULDBLOCK)
  1321. #endif
  1322. {
  1323. if (errbuf) {
  1324. #ifdef WIN32
  1325. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::SendData(): send(): Errorcode: %i", WSAGetLastError());
  1326. #else
  1327. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::SendData(): send(): Errorcode: %s", strerror(errno));
  1328. #endif
  1329. }
  1330. return false;
  1331. }
  1332. }
  1333. }
  1334. if (TCPMode == modePacket && keepalive_timer->Check()) {
  1335. ServerPacket* pack = new ServerPacket(0, 0);
  1336. SendPacket(pack);
  1337. safe_delete(pack);
  1338. #if TCPN_DEBUG >= 5
  1339. cout << "Sending TCP keepalive packet. (timeout=" << timeout_timer->GetRemainingTime() << " remaining)" << endl;
  1340. #endif
  1341. }
  1342. return true;
  1343. }
  1344. ThreadReturnType TCPConnectionLoop(void* tmp) {
  1345. #ifdef WIN32
  1346. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  1347. #endif
  1348. if (tmp == 0) {
  1349. ThrowError("TCPConnectionLoop(): tmp = 0!");
  1350. THREAD_RETURN(NULL);
  1351. }
  1352. TCPConnection* tcpc = (TCPConnection*) tmp;
  1353. tcpc->MLoopRunning.lock();
  1354. while (tcpc->RunLoop()) {
  1355. Sleep(LOOP_GRANULARITY);
  1356. if (tcpc->GetState() != TCPS_Ready) {
  1357. if (!tcpc->Process()) {
  1358. tcpc->Disconnect();
  1359. }
  1360. }
  1361. else if (tcpc->GetAsyncConnect()) {
  1362. if (tcpc->charAsyncConnect)
  1363. tcpc->Connect(tcpc->charAsyncConnect, tcpc->GetrPort());
  1364. else
  1365. tcpc->Connect(tcpc->GetrIP(), tcpc->GetrPort());
  1366. tcpc->SetAsyncConnect(false);
  1367. }
  1368. else
  1369. Sleep(10);
  1370. }
  1371. tcpc->MLoopRunning.unlock();
  1372. THREAD_RETURN(NULL);
  1373. }
  1374. bool TCPConnection::RunLoop() {
  1375. bool ret;
  1376. MRunLoop.lock();
  1377. ret = pRunLoop;
  1378. MRunLoop.unlock();
  1379. return ret;
  1380. }
  1381. TCPServer::TCPServer(int16 in_port, bool iOldFormat) {
  1382. NextID = 1;
  1383. pPort = in_port;
  1384. sock = 0;
  1385. pOldFormat = iOldFormat;
  1386. list = new LinkedList<TCPConnection*>;
  1387. pRunLoop = true;
  1388. #ifdef WIN32
  1389. _beginthread(TCPServerLoop, 0, this);
  1390. #else
  1391. pthread_t thread;
  1392. pthread_create(&thread, NULL, &TCPServerLoop, this);
  1393. pthread_detach(thread);
  1394. #endif
  1395. }
  1396. TCPServer::~TCPServer() {
  1397. MRunLoop.lock();
  1398. pRunLoop = false;
  1399. MRunLoop.unlock();
  1400. MLoopRunning.lock();
  1401. MLoopRunning.unlock();
  1402. while (NewQueue.pop()); // the objects are deleted with the list, clear this queue so it doesnt try to delete them again
  1403. safe_delete(list);
  1404. }
  1405. bool TCPServer::RunLoop() {
  1406. bool ret;
  1407. MRunLoop.lock();
  1408. ret = pRunLoop;
  1409. MRunLoop.unlock();
  1410. return ret;
  1411. }
  1412. ThreadReturnType TCPServerLoop(void* tmp) {
  1413. #ifdef WIN32
  1414. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  1415. #endif
  1416. if (tmp == 0) {
  1417. ThrowError("TCPServerLoop(): tmp = 0!");
  1418. THREAD_RETURN(NULL);
  1419. }
  1420. TCPServer* tcps = (TCPServer*) tmp;
  1421. tcps->MLoopRunning.lock();
  1422. while (tcps->RunLoop()) {
  1423. Sleep(SERVER_LOOP_GRANULARITY);
  1424. tcps->Process();
  1425. }
  1426. tcps->MLoopRunning.unlock();
  1427. THREAD_RETURN(NULL);
  1428. }
  1429. void TCPServer::Process() {
  1430. CheckInQueue();
  1431. ListenNewConnections();
  1432. LinkedListIterator<TCPConnection*> iterator(*list);
  1433. iterator.Reset();
  1434. while(iterator.MoreElements()) {
  1435. if (iterator.GetData()->IsFree() && (!iterator.GetData()->CheckNetActive())) {
  1436. #if _DEBUG
  1437. LogWrite(NET__DEBUG, 0, "Net", "EQStream Connection deleted.");
  1438. #endif
  1439. iterator.RemoveCurrent();
  1440. }
  1441. else {
  1442. if (!iterator.GetData()->Process())
  1443. iterator.GetData()->Disconnect();
  1444. iterator.Advance();
  1445. }
  1446. }
  1447. }
  1448. void TCPServer::ListenNewConnections() {
  1449. SOCKET tmpsock;
  1450. struct sockaddr_in from;
  1451. struct in_addr in;
  1452. unsigned int fromlen;
  1453. TCPConnection* con;
  1454. from.sin_family = AF_INET;
  1455. fromlen = sizeof(from);
  1456. LockMutex lock(&MSock);
  1457. if (!sock)
  1458. return;
  1459. // Check for pending connects
  1460. #ifdef WIN32
  1461. unsigned long nonblocking = 1;
  1462. while ((tmpsock = accept(sock, (struct sockaddr*) &from, (int *) &fromlen)) != INVALID_SOCKET) {
  1463. ioctlsocket (tmpsock, FIONBIO, &nonblocking);
  1464. #else
  1465. while ((tmpsock = accept(sock, (struct sockaddr*) &from, &fromlen)) != INVALID_SOCKET) {
  1466. fcntl(tmpsock, F_SETFL, O_NONBLOCK);
  1467. #endif
  1468. int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k
  1469. setsockopt(tmpsock, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize));
  1470. in.s_addr = from.sin_addr.s_addr;
  1471. // New TCP connection
  1472. con = new TCPConnection(this, tmpsock, in.s_addr, ntohs(from.sin_port), pOldFormat);
  1473. #if TCPN_DEBUG >= 1
  1474. cout << "New TCP connection: " << inet_ntoa(in) << ":" << con->GetrPort() << endl;
  1475. #endif
  1476. AddConnection(con);
  1477. }
  1478. }
  1479. bool TCPServer::Open(int16 in_port, char* errbuf) {
  1480. if (errbuf)
  1481. errbuf[0] = 0;
  1482. LockMutex lock(&MSock);
  1483. if (sock != 0) {
  1484. if (errbuf)
  1485. snprintf(errbuf, TCPConnection_ErrorBufferSize, "Listening socket already open");
  1486. return false;
  1487. }
  1488. if (in_port != 0) {
  1489. pPort = in_port;
  1490. }
  1491. #ifdef WIN32
  1492. SOCKADDR_IN address;
  1493. unsigned long nonblocking = 1;
  1494. #else
  1495. struct sockaddr_in address;
  1496. #endif
  1497. int reuse_addr = 1;
  1498. // Setup internet address information.
  1499. // This is used with the bind() call
  1500. memset((char *) &address, 0, sizeof(address));
  1501. address.sin_family = AF_INET;
  1502. address.sin_port = htons(pPort);
  1503. address.sin_addr.s_addr = htonl(INADDR_ANY);
  1504. // Setting up TCP port for new TCP connections
  1505. sock = socket(AF_INET, SOCK_STREAM, 0);
  1506. if (sock == INVALID_SOCKET) {
  1507. if (errbuf)
  1508. snprintf(errbuf, TCPConnection_ErrorBufferSize, "socket(): INVALID_SOCKET");
  1509. return false;
  1510. }
  1511. // Quag: dont think following is good stuff for TCP, good for UDP
  1512. // Mis: SO_REUSEADDR shouldn't be a problem for tcp--allows you to restart
  1513. // without waiting for conns in TIME_WAIT to die
  1514. setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse_addr, sizeof(reuse_addr));
  1515. if (::bind(sock, (struct sockaddr *) &address, sizeof(address)) < 0) {
  1516. #ifdef WIN32
  1517. closesocket(sock);
  1518. #else
  1519. close(sock);
  1520. #endif
  1521. sock = 0;
  1522. if (errbuf)
  1523. sprintf(errbuf, "bind(): <0");
  1524. return false;
  1525. }
  1526. int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k
  1527. setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize));
  1528. #ifdef WIN32
  1529. ioctlsocket (sock, FIONBIO, &nonblocking);
  1530. #else
  1531. fcntl(sock, F_SETFL, O_NONBLOCK);
  1532. #endif
  1533. if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
  1534. #ifdef WIN32
  1535. closesocket(sock);
  1536. if (errbuf)
  1537. snprintf(errbuf, TCPConnection_ErrorBufferSize, "listen() failed, Error: %d", WSAGetLastError());
  1538. #else
  1539. close(sock);
  1540. if (errbuf)
  1541. snprintf(errbuf, TCPConnection_ErrorBufferSize, "listen() failed, Error: %s", strerror(errno));
  1542. #endif
  1543. sock = 0;
  1544. return false;
  1545. }
  1546. return true;
  1547. }
  1548. void TCPServer::Close() {
  1549. LockMutex lock(&MSock);
  1550. if (sock) {
  1551. #ifdef WIN32
  1552. closesocket(sock);
  1553. #else
  1554. close(sock);
  1555. #endif
  1556. }
  1557. sock = 0;
  1558. }
  1559. bool TCPServer::IsOpen() {
  1560. MSock.lock();
  1561. bool ret = (bool) (sock != 0);
  1562. MSock.unlock();
  1563. return ret;
  1564. }
  1565. TCPConnection* TCPServer::NewQueuePop() {
  1566. TCPConnection* ret;
  1567. MNewQueue.lock();
  1568. ret = NewQueue.pop();
  1569. MNewQueue.unlock();
  1570. return ret;
  1571. }
  1572. void TCPServer::AddConnection(TCPConnection* con) {
  1573. list->Append(con);
  1574. MNewQueue.lock();
  1575. NewQueue.push(con);
  1576. MNewQueue.unlock();
  1577. }
  1578. TCPConnection* TCPServer::GetConnection(int32 iID) {
  1579. LinkedListIterator<TCPConnection*> iterator(*list);
  1580. iterator.Reset();
  1581. while(iterator.MoreElements()) {
  1582. if (iterator.GetData()->GetID() == iID)
  1583. return iterator.GetData();
  1584. iterator.Advance();
  1585. }
  1586. return 0;
  1587. }
  1588. void TCPServer::SendPacket(ServerPacket* pack) {
  1589. TCPConnection::TCPNetPacket_Struct* tnps = TCPConnection::MakePacket(pack);
  1590. SendPacket(&tnps);
  1591. }
  1592. void TCPServer::SendPacket(TCPConnection::TCPNetPacket_Struct** tnps) {
  1593. MInQueue.lock();
  1594. InQueue.push(*tnps);
  1595. MInQueue.unlock();
  1596. tnps = 0;
  1597. }
  1598. void TCPServer::CheckInQueue() {
  1599. LinkedListIterator<TCPConnection*> iterator(*list);
  1600. TCPConnection::TCPNetPacket_Struct* tnps = 0;
  1601. while (( tnps = InQueuePop() )) {
  1602. iterator.Reset();
  1603. while(iterator.MoreElements()) {
  1604. if (iterator.GetData()->GetMode() != modeConsole && iterator.GetData()->GetRemoteID() == 0)
  1605. iterator.GetData()->SendPacket(tnps);
  1606. iterator.Advance();
  1607. }
  1608. safe_delete(tnps);
  1609. }
  1610. }
  1611. TCPConnection::TCPNetPacket_Struct* TCPServer::InQueuePop() {
  1612. TCPConnection::TCPNetPacket_Struct* ret;
  1613. MInQueue.lock();
  1614. ret = InQueue.pop();
  1615. MInQueue.unlock();
  1616. return ret;
  1617. }