TCPConnection.cpp 42 KB


  1. /*
  2. EQ2Emulator: Everquest II Server Emulator
  3. Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net)
  4. This file is part of EQ2Emulator.
  5. EQ2Emulator is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU General Public License as published by
  7. the Free Software Foundation, either version 3 of the License, or
  8. (at your option) any later version.
  9. EQ2Emulator is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. GNU General Public License for more details.
  13. You should have received a copy of the GNU General Public License
  14. along with EQ2Emulator. If not, see <http://www.gnu.org/licenses/>.
  15. */
  16. #include "../common/debug.h"
  17. #include <iostream>
  18. using namespace std;
  19. #include <string.h>
  20. #include <stdio.h>
  21. #include <iomanip>
  22. using namespace std;
  23. #include "TCPConnection.h"
  24. #include "../common/servertalk.h"
  25. #include "../common/timer.h"
  26. #include "../common/packet_dump.h"
  27. #include "Log.h"
  28. #ifdef FREEBSD //Timothy Whitman - January 7, 2003
  29. #define MSG_NOSIGNAL 0
  30. #endif
  31. #ifdef WIN32
  32. InitWinsock winsock;
  33. #endif
  34. #define LOOP_GRANULARITY 3 //# of ms between checking our socket/queues
  35. #define SERVER_LOOP_GRANULARITY 3 //# of ms between checking our socket/queues
  36. #define TCPN_DEBUG 0
  37. #define TCPN_DEBUG_Console 0
  38. #define TCPN_DEBUG_Memory 0
  39. #define TCPN_LOG_PACKETS 0
  40. #define TCPN_LOG_RAW_DATA_OUT 0
  41. #define TCPN_LOG_RAW_DATA_IN 0
  42. TCPConnection::TCPNetPacket_Struct* TCPConnection::MakePacket(ServerPacket* pack, int32 iDestination) {
  43. sint32 size = sizeof(TCPNetPacket_Struct) + pack->size;
  44. if (pack->compressed) {
  45. size += 4;
  46. }
  47. if (iDestination) {
  48. size += 4;
  49. }
  50. TCPNetPacket_Struct* tnps = (TCPNetPacket_Struct*) new uchar[size];
  51. tnps->size = size;
  52. tnps->opcode = pack->opcode;
  53. *((int8*) &tnps->flags) = 0;
  54. uchar* buffer = tnps->buffer;
  55. if (pack->compressed) {
  56. tnps->flags.compressed = 1;
  57. *((sint32*) buffer) = pack->InflatedSize;
  58. buffer += 4;
  59. }
  60. if (iDestination) {
  61. tnps->flags.destination = 1;
  62. *((sint32*) buffer) = iDestination;
  63. buffer += 4;
  64. }
  65. memcpy(buffer, pack->pBuffer, pack->size);
  66. return tnps;
  67. }
  68. TCPConnection::TCPConnection(bool iOldFormat, TCPServer* iRelayServer, eTCPMode iMode) {
  69. id = 0;
  70. Server = iRelayServer;
  71. if (Server)
  72. RelayServer = true;
  73. else
  74. RelayServer = false;
  75. RelayLink = 0;
  76. RelayCount = 0;
  77. RemoteID = 0;
  78. pOldFormat = iOldFormat;
  79. ConnectionType = Outgoing;
  80. TCPMode = iMode;
  81. pState = TCPS_Ready;
  82. pFree = false;
  83. pEcho = false;
  84. sock = 0;
  85. rIP = 0;
  86. rPort = 0;
  87. keepalive_timer = new Timer(SERVER_TIMEOUT);
  88. timeout_timer = new Timer(SERVER_TIMEOUT * 2);
  89. recvbuf = 0;
  90. sendbuf = 0;
  91. pRunLoop = false;
  92. charAsyncConnect = 0;
  93. pAsyncConnect = false;
  94. connection_socket = 0;
  95. recvbuf_size = 0;
  96. recvbuf_used = 0;
  97. recvbuf_echo = 0;
  98. sendbuf_size = 0;
  99. sendbuf_used = 0;
  100. #if TCPN_DEBUG_Memory >= 7
  101. cout << "Constructor #1 on outgoing TCP# " << GetID() << endl;
  102. #endif
  103. }
  104. TCPConnection::TCPConnection(TCPServer* iServer, SOCKET in_socket, int32 irIP, int16 irPort, bool iOldFormat) {
  105. Server = iServer;
  106. RelayLink = 0;
  107. RelayServer = false;
  108. RelayCount = 0;
  109. RemoteID = 0;
  110. id = Server->GetNextID();
  111. ConnectionType = Incomming;
  112. pOldFormat = iOldFormat;
  113. TCPMode = modePacket;
  114. pState = TCPS_Connected;
  115. pFree = false;
  116. pEcho = false;
  117. sock = 0;
  118. connection_socket = in_socket;
  119. rIP = irIP;
  120. rPort = irPort;
  121. keepalive_timer = new Timer(SERVER_TIMEOUT);
  122. timeout_timer = new Timer(SERVER_TIMEOUT * 2);
  123. recvbuf = 0;
  124. sendbuf = 0;
  125. pRunLoop = false;
  126. charAsyncConnect = 0;
  127. pAsyncConnect = false;
  128. recvbuf_size = 0;
  129. recvbuf_used = 0;
  130. recvbuf_echo = 0;
  131. sendbuf_size = 0;
  132. sendbuf_used = 0;
  133. #if TCPN_DEBUG_Memory >= 7
  134. cout << "Constructor #2 on outgoing TCP# " << GetID() << endl;
  135. #endif
  136. }
  137. TCPConnection::TCPConnection(TCPServer* iServer, TCPConnection* iRelayLink, int32 iRemoteID, int32 irIP, int16 irPort) {
  138. Server = iServer;
  139. RelayLink = iRelayLink;
  140. RelayServer = true;
  141. id = Server->GetNextID();
  142. RelayCount = 0;
  143. RemoteID = iRemoteID;
  144. if (!RemoteID)
  145. ThrowError("Error: TCPConnection: RemoteID == 0 on RelayLink constructor");
  146. pOldFormat = false;
  147. ConnectionType = Incomming;
  148. TCPMode = modePacket;
  149. pState = TCPS_Connected;
  150. pFree = false;
  151. pEcho = false;
  152. sock = 0;
  153. connection_socket = 0;
  154. rIP = irIP;
  155. rPort = irPort;
  156. keepalive_timer = 0;
  157. timeout_timer = 0;
  158. recvbuf = 0;
  159. sendbuf = 0;
  160. pRunLoop = false;
  161. charAsyncConnect = 0;
  162. pAsyncConnect = false;
  163. recvbuf_size = 0;
  164. recvbuf_used = 0;
  165. recvbuf_echo = 0;
  166. sendbuf_size = 0;
  167. sendbuf_used = 0;
  168. #if TCPN_DEBUG_Memory >= 7
  169. cout << "Constructor #3 on outgoing TCP# " << GetID() << endl;
  170. #endif
  171. }
  172. TCPConnection::~TCPConnection() {
  173. Disconnect();
  174. ClearBuffers();
  175. if (ConnectionType == Outgoing) {
  176. MRunLoop.lock();
  177. pRunLoop = false;
  178. MRunLoop.unlock();
  179. MLoopRunning.lock();
  180. MLoopRunning.unlock();
  181. #if TCPN_DEBUG_Memory >= 6
  182. cout << "Deconstructor on outgoing TCP# " << GetID() << endl;
  183. #endif
  184. }
  185. #if TCPN_DEBUG_Memory >= 5
  186. else {
  187. cout << "Deconstructor on incomming TCP# " << GetID() << endl;
  188. }
  189. #endif
  190. safe_delete(keepalive_timer);
  191. safe_delete(timeout_timer);
  192. safe_delete_array(recvbuf);
  193. safe_delete_array(sendbuf);
  194. safe_delete_array(charAsyncConnect);
  195. }
  196. void TCPConnection::SetState(int8 in_state) {
  197. MState.lock();
  198. pState = in_state;
  199. MState.unlock();
  200. }
  201. int8 TCPConnection::GetState() {
  202. int8 ret;
  203. MState.lock();
  204. ret = pState;
  205. MState.unlock();
  206. return ret;
  207. }
  208. void TCPConnection::Free() {
  209. if (ConnectionType == Outgoing) {
  210. ThrowError("TCPConnection::Free() called on an Outgoing connection");
  211. }
  212. #if TCPN_DEBUG_Memory >= 5
  213. cout << "Free on TCP# " << GetID() << endl;
  214. #endif
  215. Disconnect();
  216. pFree = true;
  217. }
  218. bool TCPConnection::SendPacket(ServerPacket* pack, int32 iDestination) {
  219. LockMutex lock(&MState);
  220. if (!Connected())
  221. return false;
  222. eTCPMode tmp = GetMode();
  223. if (tmp != modePacket && tmp != modeTransition)
  224. return false;
  225. if (RemoteID)
  226. return RelayLink->SendPacket(pack, RemoteID);
  227. else {
  228. TCPNetPacket_Struct* tnps = MakePacket(pack, iDestination);
  229. if (tmp == modeTransition) {
  230. InModeQueuePush(tnps);
  231. }
  232. else {
  233. #if TCPN_LOG_PACKETS >= 1
  234. if (pack && pack->opcode != 0) {
  235. struct in_addr in;
  236. in.s_addr = GetrIP();
  237. CoutTimestamp(true);
  238. cout << ": Logging outgoing TCP packet. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl;
  239. #if TCPN_LOG_PACKETS == 2
  240. if (pack->size >= 32)
  241. DumpPacket(pack->pBuffer, 32);
  242. else
  243. DumpPacket(pack);
  244. #endif
  245. #if TCPN_LOG_PACKETS >= 3
  246. DumpPacket(pack);
  247. #endif
  248. }
  249. #endif
  250. ServerSendQueuePushEnd((uchar**) &tnps, tnps->size);
  251. }
  252. }
  253. return true;
  254. }
  255. bool TCPConnection::SendPacket(TCPNetPacket_Struct* tnps) {
  256. LockMutex lock(&MState);
  257. if (RemoteID)
  258. return false;
  259. if (!Connected())
  260. return false;
  261. eTCPMode tmp = GetMode();
  262. if (tmp == modeTransition) {
  263. TCPNetPacket_Struct* tnps2 = (TCPNetPacket_Struct*) new uchar[tnps->size];
  264. memcpy(tnps2, tnps, tnps->size);
  265. InModeQueuePush(tnps2);
  266. return true;
  267. }
  268. if (GetMode() != modePacket)
  269. return false;
  270. #if TCPN_LOG_PACKETS >= 1
  271. if (tnps && tnps->opcode != 0) {
  272. struct in_addr in;
  273. in.s_addr = GetrIP();
  274. CoutTimestamp(true);
  275. cout << ": Logging outgoing TCP NetPacket. OPCode: 0x" << hex << setw(4) << setfill('0') << tnps->opcode << dec << ", size: " << setw(5) << setfill(' ') << tnps->size << " " << inet_ntoa(in) << ":" << GetrPort();
  276. if (pOldFormat)
  277. cout << " (OldFormat)";
  278. cout << endl;
  279. #if TCPN_LOG_PACKETS == 2
  280. if (tnps->size >= 32)
  281. DumpPacket((uchar*) tnps, 32);
  282. else
  283. DumpPacket((uchar*) tnps, tnps->size);
  284. #endif
  285. #if TCPN_LOG_PACKETS >= 3
  286. DumpPacket((uchar*) tnps, tnps->size);
  287. #endif
  288. }
  289. #endif
  290. ServerSendQueuePushEnd((const uchar*) tnps, tnps->size);
  291. return true;
  292. }
  293. bool TCPConnection::Send(const uchar* data, sint32 size) {
  294. if (!Connected())
  295. return false;
  296. if (GetMode() != modeConsole)
  297. return false;
  298. if (!size)
  299. return true;
  300. ServerSendQueuePushEnd(data, size);
  301. return true;
  302. }
  303. void TCPConnection::InModeQueuePush(TCPNetPacket_Struct* tnps) {
  304. MSendQueue.lock();
  305. InModeQueue.push(tnps);
  306. MSendQueue.unlock();
  307. }
  308. void TCPConnection::ServerSendQueuePushEnd(const uchar* data, sint32 size) {
  309. MSendQueue.lock();
  310. if (sendbuf == 0) {
  311. sendbuf = new uchar[size];
  312. sendbuf_size = size;
  313. sendbuf_used = 0;
  314. }
  315. else if (size > (sendbuf_size - sendbuf_used)) {
  316. sendbuf_size += size + 1024;
  317. uchar* tmp = new uchar[sendbuf_size];
  318. memcpy(tmp, sendbuf, sendbuf_used);
  319. safe_delete_array(sendbuf);
  320. sendbuf = tmp;
  321. }
  322. memcpy(&sendbuf[sendbuf_used], data, size);
  323. sendbuf_used += size;
  324. MSendQueue.unlock();
  325. }
  326. void TCPConnection::ServerSendQueuePushEnd(uchar** data, sint32 size) {
  327. MSendQueue.lock();
  328. if (sendbuf == 0) {
  329. sendbuf = *data;
  330. sendbuf_size = size;
  331. sendbuf_used = size;
  332. MSendQueue.unlock();
  333. *data = 0;
  334. return;
  335. }
  336. if (size > (sendbuf_size - sendbuf_used)) {
  337. sendbuf_size += size;
  338. uchar* tmp = new uchar[sendbuf_size];
  339. memcpy(tmp, sendbuf, sendbuf_used);
  340. safe_delete_array(sendbuf);
  341. sendbuf = tmp;
  342. }
  343. memcpy(&sendbuf[sendbuf_used], *data, size);
  344. sendbuf_used += size;
  345. MSendQueue.unlock();
  346. delete[] (TCPNetPacket_Struct*)*data;
  347. }
  348. void TCPConnection::ServerSendQueuePushFront(uchar* data, sint32 size) {
  349. MSendQueue.lock();
  350. if (sendbuf == 0) {
  351. sendbuf = new uchar[size];
  352. sendbuf_size = size;
  353. sendbuf_used = 0;
  354. }
  355. else if (size > (sendbuf_size - sendbuf_used)) {
  356. sendbuf_size += size;
  357. uchar* tmp = new uchar[sendbuf_size];
  358. memcpy(&tmp[size], sendbuf, sendbuf_used);
  359. safe_delete_array(sendbuf);
  360. sendbuf = tmp;
  361. }
  362. memcpy(sendbuf, data, size);
  363. sendbuf_used += size;
  364. MSendQueue.unlock();
  365. }
  366. bool TCPConnection::ServerSendQueuePop(uchar** data, sint32* size) {
  367. bool ret;
  368. if (!MSendQueue.trylock())
  369. return false;
  370. if (sendbuf) {
  371. *data = sendbuf;
  372. *size = sendbuf_used;
  373. sendbuf = 0;
  374. ret = true;
  375. }
  376. else {
  377. ret = false;
  378. }
  379. MSendQueue.unlock();
  380. return ret;
  381. }
  382. ServerPacket* TCPConnection::PopPacket() {
  383. ServerPacket* ret;
  384. if (!MOutQueueLock.trylock())
  385. return 0;
  386. ret = OutQueue.pop();
  387. MOutQueueLock.unlock();
  388. return ret;
  389. }
  390. char* TCPConnection::PopLine() {
  391. char* ret;
  392. if (!MOutQueueLock.trylock())
  393. return 0;
  394. ret = (char*) LineOutQueue.pop();
  395. MOutQueueLock.unlock();
  396. return ret;
  397. }
  398. void TCPConnection::OutQueuePush(ServerPacket* pack) {
  399. MOutQueueLock.lock();
  400. OutQueue.push(pack);
  401. MOutQueueLock.unlock();
  402. }
  403. void TCPConnection::LineOutQueuePush(char* line) {
  404. #if defined(GOTFRAGS) && 0
  405. if (strcmp(line, "**CRASHME**") == 0) {
  406. int i = 0;
  407. cout << (5 / i) << endl;
  408. }
  409. #endif
  410. if (strcmp(line, "**PACKETMODE**") == 0) {
  411. MSendQueue.lock();
  412. safe_delete_array(sendbuf);
  413. if (TCPMode == modeConsole)
  414. Send((const uchar*) "\0**PACKETMODE**\r", 16);
  415. TCPMode = modePacket;
  416. TCPNetPacket_Struct* tnps = 0;
  417. while ((tnps = InModeQueue.pop())) {
  418. SendPacket(tnps);
  419. safe_delete_array(tnps);
  420. }
  421. MSendQueue.unlock();
  422. safe_delete_array(line);
  423. return;
  424. }
  425. MOutQueueLock.lock();
  426. LineOutQueue.push(line);
  427. MOutQueueLock.unlock();
  428. }
  429. void TCPConnection::Disconnect(bool iSendRelayDisconnect) {
  430. if (connection_socket != INVALID_SOCKET && connection_socket != 0) {
  431. MState.lock();
  432. if (pState == TCPS_Connected || pState == TCPS_Disconnecting || pState == TCPS_Disconnected)
  433. SendData();
  434. pState = TCPS_Closing;
  435. MState.unlock();
  436. shutdown(connection_socket, 0x01);
  437. shutdown(connection_socket, 0x00);
  438. #ifdef WIN32
  439. closesocket(connection_socket);
  440. #else
  441. close(connection_socket);
  442. #endif
  443. connection_socket = 0;
  444. rIP = 0;
  445. rPort = 0;
  446. ClearBuffers();
  447. }
  448. SetState(TCPS_Ready);
  449. if (RelayLink) {
  450. RelayLink->RemoveRelay(this, iSendRelayDisconnect);
  451. RelayLink = 0;
  452. }
  453. }
  454. bool TCPConnection::GetAsyncConnect() {
  455. bool ret;
  456. MAsyncConnect.lock();
  457. ret = pAsyncConnect;
  458. MAsyncConnect.unlock();
  459. return ret;
  460. }
  461. bool TCPConnection::SetAsyncConnect(bool iValue) {
  462. bool ret;
  463. MAsyncConnect.lock();
  464. ret = pAsyncConnect;
  465. pAsyncConnect = iValue;
  466. MAsyncConnect.unlock();
  467. return ret;
  468. }
  469. void TCPConnection::AsyncConnect(char* irAddress, int16 irPort) {
  470. if (ConnectionType != Outgoing) {
  471. // If this code runs, we got serious problems
  472. // Crash and burn.
  473. ThrowError("TCPConnection::AsyncConnect() call on a Incomming connection object!");
  474. return;
  475. }
  476. if (GetState() != TCPS_Ready)
  477. return;
  478. MAsyncConnect.lock();
  479. if (pAsyncConnect) {
  480. MAsyncConnect.unlock();
  481. return;
  482. }
  483. pAsyncConnect = true;
  484. safe_delete_array(charAsyncConnect);
  485. charAsyncConnect = new char[strlen(irAddress) + 1];
  486. strcpy(charAsyncConnect, irAddress);
  487. rPort = irPort;
  488. MAsyncConnect.unlock();
  489. if (!pRunLoop) {
  490. pRunLoop = true;
  491. #ifdef WIN32
  492. _beginthread(TCPConnectionLoop, 0, this);
  493. #else
  494. pthread_t thread;
  495. pthread_create(&thread, NULL, TCPConnectionLoop, this);
  496. pthread_detach(thread);
  497. #endif
  498. }
  499. return;
  500. }
  501. void TCPConnection::AsyncConnect(int32 irIP, int16 irPort) {
  502. if (ConnectionType != Outgoing) {
  503. // If this code runs, we got serious problems
  504. // Crash and burn.
  505. ThrowError("TCPConnection::AsyncConnect() call on a Incomming connection object!");
  506. return;
  507. }
  508. if (GetState() != TCPS_Ready)
  509. return;
  510. MAsyncConnect.lock();
  511. if (pAsyncConnect) {
  512. MAsyncConnect.unlock();
  513. return;
  514. }
  515. pAsyncConnect = true;
  516. safe_delete(charAsyncConnect);
  517. rIP = irIP;
  518. rPort = irPort;
  519. MAsyncConnect.unlock();
  520. if (!pRunLoop) {
  521. pRunLoop = true;
  522. #ifdef WIN32
  523. _beginthread(TCPConnectionLoop, 0, this);
  524. #else
  525. pthread_t thread;
  526. pthread_create(&thread, NULL, TCPConnectionLoop, this);
  527. pthread_detach(thread);
  528. #endif
  529. }
  530. return;
  531. }
  532. bool TCPConnection::Connect(char* irAddress, int16 irPort, char* errbuf) {
  533. if (errbuf)
  534. errbuf[0] = 0;
  535. int32 tmpIP = ResolveIP(irAddress);
  536. if (!tmpIP) {
  537. if (errbuf) {
  538. #ifdef WIN32
  539. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Couldnt resolve hostname. Error: %i", WSAGetLastError());
  540. #else
  541. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Couldnt resolve hostname. Error #%i: %s", errno, strerror(errno));
  542. #endif
  543. }
  544. return false;
  545. }
  546. return Connect(tmpIP, irPort, errbuf);
  547. }
  548. bool TCPConnection::Connect(int32 in_ip, int16 in_port, char* errbuf) {
  549. if (errbuf)
  550. errbuf[0] = 0;
  551. if (ConnectionType != Outgoing) {
  552. // If this code runs, we got serious problems
  553. // Crash and burn.
  554. ThrowError("TCPConnection::Connect() call on a Incomming connection object!");
  555. return false;
  556. }
  557. MState.lock();
  558. if (pState == TCPS_Ready) {
  559. pState = TCPS_Connecting;
  560. }
  561. else {
  562. MState.unlock();
  563. SetAsyncConnect(false);
  564. return false;
  565. }
  566. MState.unlock();
  567. if (!pRunLoop) {
  568. pRunLoop = true;
  569. #ifdef WIN32
  570. _beginthread(TCPConnectionLoop, 0, this);
  571. #else
  572. pthread_t thread;
  573. pthread_create(&thread, NULL, TCPConnectionLoop, this);
  574. pthread_detach(thread);
  575. #endif
  576. }
  577. connection_socket = INVALID_SOCKET;
  578. struct sockaddr_in server_sin;
  579. // struct in_addr in;
  580. if ((connection_socket = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET || connection_socket == 0) {
  581. #ifdef WIN32
  582. if (errbuf)
  583. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Allocating socket failed. Error: %i", WSAGetLastError());
  584. #else
  585. if (errbuf)
  586. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Allocating socket failed. Error: %s", strerror(errno));
  587. #endif
  588. SetState(TCPS_Ready);
  589. SetAsyncConnect(false);
  590. return false;
  591. }
  592. server_sin.sin_family = AF_INET;
  593. server_sin.sin_addr.s_addr = in_ip;
  594. server_sin.sin_port = htons(in_port);
  595. // Establish a connection to the server socket.
  596. #ifdef WIN32
  597. if (connect(connection_socket, (PSOCKADDR) &server_sin, sizeof (server_sin)) == SOCKET_ERROR) {
  598. if (errbuf)
  599. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): connect() failed. Error: %i", WSAGetLastError());
  600. closesocket(connection_socket);
  601. connection_socket = 0;
  602. SetState(TCPS_Ready);
  603. SetAsyncConnect(false);
  604. return false;
  605. }
  606. #else
  607. if (connect(connection_socket, (struct sockaddr *) &server_sin, sizeof (server_sin)) == SOCKET_ERROR) {
  608. if (errbuf)
  609. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): connect() failed. Error: %s", strerror(errno));
  610. close(connection_socket);
  611. connection_socket = 0;
  612. SetState(TCPS_Ready);
  613. SetAsyncConnect(false);
  614. return false;
  615. }
  616. #endif
  617. int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k
  618. setsockopt(connection_socket, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize));
  619. #ifdef WIN32
  620. unsigned long nonblocking = 1;
  621. ioctlsocket(connection_socket, FIONBIO, &nonblocking);
  622. #else
  623. fcntl(connection_socket, F_SETFL, O_NONBLOCK);
  624. #endif
  625. SetEcho(false);
  626. MSendQueue.lock();
  627. ClearBuffers();
  628. TCPMode = modePacket;
  629. MSendQueue.unlock();
  630. rIP = in_ip;
  631. rPort = in_port;
  632. SetState(TCPS_Connected);
  633. SetAsyncConnect(false);
  634. return true;
  635. }
  636. void TCPConnection::ClearBuffers() {
  637. LockMutex lock1(&MSendQueue);
  638. LockMutex lock2(&MOutQueueLock);
  639. LockMutex lock3(&MRunLoop);
  640. LockMutex lock4(&MState);
  641. safe_delete_array(recvbuf);
  642. safe_delete_array(sendbuf);
  643. ServerPacket* pack = 0;
  644. while ((pack = PopPacket()))
  645. safe_delete(pack);
  646. TCPNetPacket_Struct* tnps = 0;
  647. while ((tnps = InModeQueue.pop()))
  648. safe_delete(tnps);
  649. char* line = 0;
  650. while ((line = LineOutQueue.pop()))
  651. safe_delete_array(line);
  652. keepalive_timer->Start();
  653. timeout_timer->Start();
  654. }
  655. bool TCPConnection::CheckNetActive() {
  656. MState.lock();
  657. if (pState == TCPS_Connected || pState == TCPS_Disconnecting) {
  658. MState.unlock();
  659. return true;
  660. }
  661. MState.unlock();
  662. return false;
  663. }
  664. bool TCPConnection::Process() {
  665. char errbuf[TCPConnection_ErrorBufferSize];
  666. if (!CheckNetActive()) {
  667. if (ConnectionType == Outgoing) {
  668. if (GetAsyncConnect()) {
  669. if (charAsyncConnect)
  670. rIP = ResolveIP(charAsyncConnect);
  671. Connect(rIP, rPort);
  672. }
  673. }
  674. if (GetState() == TCPS_Disconnected) {
  675. Disconnect();
  676. return false;
  677. }
  678. else if (GetState() == TCPS_Connecting)
  679. return true;
  680. else
  681. return false;
  682. }
  683. if (!SendData(errbuf)) {
  684. struct in_addr in;
  685. in.s_addr = GetrIP();
  686. cout << inet_ntoa(in) << ":" << GetrPort() << ": " << errbuf << endl;
  687. return false;
  688. }
  689. if (!Connected())
  690. return false;
  691. if (!RecvData(errbuf)) {
  692. struct in_addr in;
  693. in.s_addr = GetrIP();
  694. cout << inet_ntoa(in) << ":" << GetrPort() << ": " << errbuf << endl;
  695. return false;
  696. }
  697. return true;
  698. }
  699. bool TCPConnection::RecvData(char* errbuf) {
  700. if (errbuf)
  701. errbuf[0] = 0;
  702. if (!Connected()) {
  703. return false;
  704. }
  705. int status = 0;
  706. if (recvbuf == 0) {
  707. recvbuf = new uchar[5120];
  708. recvbuf_size = 5120;
  709. recvbuf_used = 0;
  710. recvbuf_echo = 0;
  711. }
  712. else if ((recvbuf_size - recvbuf_used) < 2048) {
  713. uchar* tmpbuf = new uchar[recvbuf_size + 5120];
  714. memcpy(tmpbuf, recvbuf, recvbuf_used);
  715. recvbuf_size += 5120;
  716. safe_delete_array(recvbuf);
  717. recvbuf = tmpbuf;
  718. if (recvbuf_size >= MaxTCPReceiveBufferSize) {
  719. if (errbuf)
  720. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): recvbuf_size >= MaxTCPReceiveBufferSize");
  721. return false;
  722. }
  723. }
  724. status = recv(connection_socket, (char *) &recvbuf[recvbuf_used], (recvbuf_size - recvbuf_used), 0);
  725. if (status >= 1) {
  726. #if TCPN_LOG_RAW_DATA_IN >= 1
  727. struct in_addr in;
  728. in.s_addr = GetrIP();
  729. CoutTimestamp(true);
  730. cout << ": Read " << status << " bytes from network. (recvbuf_used = " << recvbuf_used << ") " << inet_ntoa(in) << ":" << GetrPort();
  731. if (pOldFormat)
  732. cout << " (OldFormat)";
  733. cout << endl;
  734. #if TCPN_LOG_RAW_DATA_IN == 2
  735. sint32 tmp = status;
  736. if (tmp > 32)
  737. tmp = 32;
  738. DumpPacket(&recvbuf[recvbuf_used], status);
  739. #elif TCPN_LOG_RAW_DATA_IN >= 3
  740. DumpPacket(&recvbuf[recvbuf_used], status);
  741. #endif
  742. #endif
  743. recvbuf_used += status;
  744. timeout_timer->Start();
  745. if (!ProcessReceivedData(errbuf))
  746. return false;
  747. }
  748. else if (status == SOCKET_ERROR) {
  749. #ifdef WIN32
  750. if (!(WSAGetLastError() == WSAEWOULDBLOCK)) {
  751. if (errbuf)
  752. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Error: %i", WSAGetLastError());
  753. return false;
  754. }
  755. #else
  756. if (!(errno == EWOULDBLOCK)) {
  757. if (errbuf)
  758. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Error: %s", strerror(errno));
  759. return false;
  760. }
  761. #endif
  762. }
  763. if ((TCPMode == modePacket || TCPMode == modeTransition) && timeout_timer->Check()) {
  764. if (errbuf)
  765. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Connection timeout");
  766. return false;
  767. }
  768. return true;
  769. }
  770. bool TCPConnection::GetEcho() {
  771. bool ret;
  772. MEcho.lock();
  773. ret = pEcho;
  774. MEcho.unlock();
  775. return ret;
  776. }
  777. void TCPConnection::SetEcho(bool iValue) {
  778. MEcho.lock();
  779. pEcho = iValue;
  780. MEcho.unlock();
  781. }
  782. bool TCPConnection::ProcessReceivedData(char* errbuf) {
  783. if (errbuf)
  784. errbuf[0] = 0;
  785. if (!recvbuf)
  786. return true;
  787. if (TCPMode == modePacket) {
  788. //if (pOldFormat)
  789. // return ProcessReceivedDataAsOldPackets(errbuf);
  790. //else
  791. return ProcessReceivedDataAsPackets(errbuf);
  792. }
  793. else {
  794. #if TCPN_DEBUG_Console >= 4
  795. if (recvbuf_used) {
  796. cout << "Starting Processing: recvbuf=" << recvbuf_used << endl;
  797. DumpPacket(recvbuf, recvbuf_used);
  798. }
  799. #endif
  800. for (int i=0; i < recvbuf_used; i++) {
  801. if (GetEcho() && i >= recvbuf_echo) {
  802. Send(&recvbuf[i], 1);
  803. recvbuf_echo = i + 1;
  804. }
  805. switch(recvbuf[i]) {
  806. case 0: { // 0 is the code for clear buffer
  807. if (i==0) {
  808. recvbuf_used--;
  809. recvbuf_echo--;
  810. memcpy(recvbuf, &recvbuf[1], recvbuf_used);
  811. i = -1;
  812. } else {
  813. if (i == recvbuf_used) {
  814. safe_delete_array(recvbuf);
  815. i = -1;
  816. }
  817. else {
  818. uchar* tmpdel = recvbuf;
  819. recvbuf = new uchar[recvbuf_size];
  820. memcpy(recvbuf, &tmpdel[i+1], recvbuf_used-i);
  821. recvbuf_used -= i + 1;
  822. recvbuf_echo -= i + 1;
  823. safe_delete(tmpdel);
  824. i = -1;
  825. }
  826. }
  827. #if TCPN_DEBUG_Console >= 5
  828. cout << "Removed 0x00" << endl;
  829. if (recvbuf_used) {
  830. cout << "recvbuf left: " << recvbuf_used << endl;
  831. DumpPacket(recvbuf, recvbuf_used);
  832. }
  833. else
  834. cout << "recbuf left: None" << endl;
  835. #endif
  836. break;
  837. }
  838. case 10:
  839. case 13: // newline marker
  840. {
  841. if (i==0) { // empty line
  842. recvbuf_used--;
  843. recvbuf_echo--;
  844. memcpy(recvbuf, &recvbuf[1], recvbuf_used);
  845. i = -1;
  846. } else {
  847. char* line = new char[i+1];
  848. memset(line, 0, i+1);
  849. memcpy(line, recvbuf, i);
  850. #if TCPN_DEBUG_Console >= 3
  851. cout << "Line Out: " << endl;
  852. DumpPacket((uchar*) line, i);
  853. #endif
  854. //line[i] = 0;
  855. uchar* tmpdel = recvbuf;
  856. recvbuf = new uchar[recvbuf_size];
  857. recvbuf_used -= i+1;
  858. recvbuf_echo -= i+1;
  859. memcpy(recvbuf, &tmpdel[i+1], recvbuf_used);
  860. #if TCPN_DEBUG_Console >= 5
  861. cout << "i+1=" << i+1 << endl;
  862. if (recvbuf_used) {
  863. cout << "recvbuf left: " << recvbuf_used << endl;
  864. DumpPacket(recvbuf, recvbuf_used);
  865. }
  866. else
  867. cout << "recbuf left: None" << endl;
  868. #endif
  869. safe_delete(tmpdel);
  870. if (strlen(line) > 0)
  871. LineOutQueuePush(line);
  872. else
  873. safe_delete_array(line);
  874. if (TCPMode == modePacket) {
  875. return ProcessReceivedDataAsPackets(errbuf);
  876. }
  877. i = -1;
  878. }
  879. break;
  880. }
  881. case 8: // backspace
  882. {
  883. if (i==0) { // nothin to backspace
  884. recvbuf_used--;
  885. recvbuf_echo--;
  886. memcpy(recvbuf, &recvbuf[1], recvbuf_used);
  887. i = -1;
  888. } else {
  889. uchar* tmpdel = recvbuf;
  890. recvbuf = new uchar[recvbuf_size];
  891. memcpy(recvbuf, tmpdel, i-1);
  892. memcpy(&recvbuf[i-1], &tmpdel[i+1], recvbuf_used-i);
  893. recvbuf_used -= 2;
  894. recvbuf_echo -= 2;
  895. safe_delete(tmpdel);
  896. i -= 2;
  897. }
  898. break;
  899. }
  900. }
  901. }
  902. if (recvbuf_used < 0)
  903. safe_delete_array(recvbuf);
  904. }
  905. return true;
  906. }
  907. bool TCPConnection::ProcessReceivedDataAsPackets(char* errbuf) {
  908. if (errbuf)
  909. errbuf[0] = 0;
  910. sint32 base = 0;
  911. sint32 size = 7;
  912. uchar* buffer;
  913. ServerPacket* pack = 0;
  914. while ((recvbuf_used - base) >= size) {
  915. TCPNetPacket_Struct* tnps = (TCPNetPacket_Struct*) &recvbuf[base];
  916. buffer = tnps->buffer;
  917. size = tnps->size;
  918. if (size >= MaxTCPReceiveBufferSize) {
  919. #if TCPN_DEBUG_Memory >= 1
  920. cout << "TCPConnection[" << GetID() << "]::ProcessReceivedDataAsPackets(): size[" << size << "] >= MaxTCPReceiveBufferSize" << endl;
  921. #endif
  922. if (errbuf)
  923. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(): size >= MaxTCPReceiveBufferSize");
  924. return false;
  925. }
  926. if ((recvbuf_used - base) >= size) {
  927. // ok, we got enough data to make this packet!
  928. safe_delete(pack);
  929. pack = new ServerPacket;
  930. pack->size = size - sizeof(TCPNetPacket_Struct);
  931. // read headers
  932. pack->opcode = tnps->opcode;
  933. if (tnps->flags.compressed) {
  934. pack->compressed = true;
  935. pack->InflatedSize = *((sint32*)buffer);
  936. pack->size -= 4;
  937. buffer += 4;
  938. }
  939. if (tnps->flags.destination) {
  940. pack->destination = *((sint32*)buffer);
  941. pack->size -= 4;
  942. buffer += 4;
  943. }
  944. // end read headers
  945. if (pack->size > 0) {
  946. if (tnps->flags.compressed) {
  947. // Lets decompress the packet here
  948. pack->compressed = false;
  949. if(pack->InflatedSize < MaxTCPReceiveBufferSize)
  950. {
  951. pack->pBuffer = new uchar[pack->InflatedSize];
  952. pack->size = InflatePacket(buffer, pack->size, pack->pBuffer, pack->InflatedSize);
  953. }
  954. else
  955. {
  956. cout << "Invalid inflated packet." << endl;
  957. safe_delete(pack);
  958. return false;
  959. }
  960. }
  961. else {
  962. pack->pBuffer = new uchar[pack->size];
  963. memcpy(pack->pBuffer, buffer, pack->size);
  964. }
  965. }
  966. if (pack->opcode == 0) {
  967. if (pack->size) {
  968. #if TCPN_DEBUG >= 2
  969. cout << "Received TCP Network layer packet" << endl;
  970. #endif
  971. ProcessNetworkLayerPacket(pack);
  972. }
  973. #if TCPN_DEBUG >= 5
  974. else {
  975. cout << "Received TCP keepalive packet. (opcode=0)" << endl;
  976. }
  977. #endif
  978. }
  979. else {
  980. #if TCPN_LOG_PACKETS >= 1
  981. if (pack && pack->opcode != 0) {
  982. struct in_addr in;
  983. in.s_addr = GetrIP();
  984. CoutTimestamp(true);
  985. cout << ": Logging incoming TCP packet. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl;
  986. #if TCPN_LOG_PACKETS == 2
  987. if (pack->size >= 32)
  988. DumpPacket(pack->pBuffer, 32);
  989. else
  990. DumpPacket(pack);
  991. #endif
  992. #if TCPN_LOG_PACKETS >= 3
  993. DumpPacket(pack);
  994. #endif
  995. }
  996. #endif
  997. if (RelayServer && Server && pack->destination) {
  998. TCPConnection* con = Server->GetConnection(pack->destination);
  999. if (!con) {
  1000. #if TCPN_DEBUG >= 1
  1001. cout << "Error relaying packet: con = 0" << endl;
  1002. #endif
  1003. }
  1004. else{
  1005. con->OutQueuePush(pack);
  1006. pack = 0;
  1007. }
  1008. }
  1009. else{
  1010. OutQueuePush(pack);
  1011. pack = 0;
  1012. }
  1013. }
  1014. base += size;
  1015. size = 7;
  1016. }
  1017. }
  1018. safe_delete(pack);
  1019. if (base != 0) {
  1020. if (base >= recvbuf_used) {
  1021. safe_delete_array(recvbuf);
  1022. }
  1023. else {
  1024. uchar* tmpbuf = new uchar[recvbuf_size - base];
  1025. memcpy(tmpbuf, &recvbuf[base], recvbuf_used - base);
  1026. safe_delete_array(recvbuf);
  1027. recvbuf = tmpbuf;
  1028. recvbuf_used -= base;
  1029. recvbuf_size -= base;
  1030. }
  1031. }
  1032. return true;
  1033. }
  1034. bool TCPConnection::ProcessReceivedDataAsOldPackets(char* errbuf) {
  1035. sint32 base = 0;
  1036. sint32 size = 4;
  1037. uchar* buffer;
  1038. ServerPacket* pack = 0;
  1039. while ((recvbuf_used - base) >= size) {
  1040. buffer = &recvbuf[base];
  1041. memcpy(&size, &buffer[2], 2);
  1042. if (size >= MaxTCPReceiveBufferSize) {
  1043. #if TCPN_DEBUG_Memory >= 1
  1044. cout << "TCPConnection[" << GetID() << "]::ProcessReceivedDataAsPackets(): size[" << size << "] >= MaxTCPReceiveBufferSize" << endl;
  1045. #endif
  1046. if (errbuf)
  1047. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(): size >= MaxTCPReceiveBufferSize");
  1048. return false;
  1049. }
  1050. if ((recvbuf_used - base) >= size) {
  1051. // ok, we got enough data to make this packet!
  1052. pack = new ServerPacket;
  1053. memcpy(&pack->opcode, &buffer[0], 2);
  1054. pack->size = size - 4;
  1055. LogWrite(MISC__TODO, 1, "TODO", "Checksum or size check or something similar\n\t(%s, function: %s, line #: %i)", __FILE__, __FUNCTION__, __LINE__);
  1056. /*
  1057. if () { // TODO: Checksum or size check or something similar
  1058. // Datastream corruption, get the hell outta here!
  1059. delete pack;
  1060. return false;
  1061. }
  1062. */
  1063. if (pack->size > 0) {
  1064. pack->pBuffer = new uchar[pack->size];
  1065. memcpy(pack->pBuffer, &buffer[4], pack->size);
  1066. }
  1067. if (pack->opcode == 0) {
  1068. // keepalive, no need to process
  1069. safe_delete(pack);
  1070. }
  1071. else {
  1072. #if TCPN_LOG_PACKETS >= 1
  1073. if (pack && pack->opcode != 0) {
  1074. struct in_addr in;
  1075. in.s_addr = GetrIP();
  1076. CoutTimestamp(true);
  1077. cout << ": Logging incoming TCP OldPacket. OPCode: 0x" << hex << setw(4) << setfill('0') << pack->opcode << dec << ", size: " << setw(5) << setfill(' ') << pack->size << " " << inet_ntoa(in) << ":" << GetrPort() << endl;
  1078. #if TCPN_LOG_PACKETS == 2
  1079. if (pack->size >= 32)
  1080. DumpPacket(pack->pBuffer, 32);
  1081. else
  1082. DumpPacket(pack);
  1083. #endif
  1084. #if TCPN_LOG_PACKETS >= 3
  1085. DumpPacket(pack);
  1086. #endif
  1087. }
  1088. #endif
  1089. OutQueuePush(pack);
  1090. }
  1091. base += size;
  1092. size = 4;
  1093. }
  1094. }
  1095. if (base != 0) {
  1096. if (base >= recvbuf_used) {
  1097. safe_delete_array(recvbuf);
  1098. }
  1099. else {
  1100. uchar* tmpbuf = new uchar[recvbuf_size - base];
  1101. memcpy(tmpbuf, &recvbuf[base], recvbuf_used - base);
  1102. safe_delete_array(recvbuf);
  1103. recvbuf = tmpbuf;
  1104. recvbuf_used -= base;
  1105. recvbuf_size -= base;
  1106. }
  1107. }
  1108. return true;
  1109. }
  1110. void TCPConnection::ProcessNetworkLayerPacket(ServerPacket* pack) {
  1111. int8 opcode = pack->pBuffer[0];
  1112. int8* data = &pack->pBuffer[1];
  1113. switch (opcode) {
  1114. case 0: {
  1115. break;
  1116. }
  1117. case 1: { // Switch to RelayServer mode
  1118. if (pack->size != 1) {
  1119. SendNetErrorPacket("New RelayClient: wrong size, expected 1");
  1120. break;
  1121. }
  1122. if (RelayServer) {
  1123. SendNetErrorPacket("Switch to RelayServer mode when already in RelayServer mode");
  1124. break;
  1125. }
  1126. if (RemoteID) {
  1127. SendNetErrorPacket("Switch to RelayServer mode by a Relay Client");
  1128. break;
  1129. }
  1130. if (ConnectionType != Incomming) {
  1131. SendNetErrorPacket("Switch to RelayServer mode on outgoing connection");
  1132. break;
  1133. }
  1134. #if TCPC_DEBUG >= 3
  1135. struct in_addr in;
  1136. in.s_addr = GetrIP();
  1137. cout << "Switching to RelayServer mode: " << inet_ntoa(in) << ":" << GetPort() << endl;
  1138. #endif
  1139. RelayServer = true;
  1140. break;
  1141. }
  1142. case 2: { // New Relay Client
  1143. if (!RelayServer) {
  1144. SendNetErrorPacket("New RelayClient when not in RelayServer mode");
  1145. break;
  1146. }
  1147. if (pack->size != 11) {
  1148. SendNetErrorPacket("New RelayClient: wrong size, expected 11");
  1149. break;
  1150. }
  1151. if (ConnectionType != Incomming) {
  1152. SendNetErrorPacket("New RelayClient: illegal on outgoing connection");
  1153. break;
  1154. }
  1155. TCPConnection* con = new TCPConnection(Server, this, *((int32*) data), *((int32*) &data[4]), *((int16*) &data[8]));
  1156. Server->AddConnection(con);
  1157. RelayCount++;
  1158. break;
  1159. }
  1160. case 3: { // Delete Relay Client
  1161. if (!RelayServer) {
  1162. SendNetErrorPacket("Delete RelayClient when not in RelayServer mode");
  1163. break;
  1164. }
  1165. if (pack->size != 5) {
  1166. SendNetErrorPacket("Delete RelayClient: wrong size, expected 5");
  1167. break;
  1168. }
  1169. TCPConnection* con = Server->GetConnection(*((int32*)data));
  1170. if (con) {
  1171. if (ConnectionType == Incomming) {
  1172. if (con->GetRelayLink() != this) {
  1173. SendNetErrorPacket("Delete RelayClient: RelayLink != this");
  1174. break;
  1175. }
  1176. }
  1177. con->Disconnect(false);
  1178. }
  1179. break;
  1180. }
  1181. case 255: {
  1182. #if TCPC_DEBUG >= 1
  1183. struct in_addr in;
  1184. in.s_addr = GetrIP();
  1185. cout "Received NetError: '";
  1186. if (pack->size > 1)
  1187. cout << (char*) data;
  1188. cout << "': " << inet_ntoa(in) << ":" << GetPort() << endl;
  1189. #endif
  1190. break;
  1191. }
  1192. }
  1193. }
  1194. void TCPConnection::SendNetErrorPacket(const char* reason) {
  1195. #if TCPC_DEBUG >= 1
  1196. struct in_addr in;
  1197. in.s_addr = GetrIP();
  1198. cout "NetError: '";
  1199. if (reason)
  1200. cout << reason;
  1201. cout << "': " << inet_ntoa(in) << ":" << GetPort() << endl;
  1202. #endif
  1203. ServerPacket* pack = new ServerPacket(0);
  1204. pack->size = 1;
  1205. if (reason)
  1206. pack->size += strlen(reason) + 1;
  1207. pack->pBuffer = new uchar[pack->size];
  1208. memset(pack->pBuffer, 0, pack->size);
  1209. pack->pBuffer[0] = 255;
  1210. strcpy((char*) &pack->pBuffer[1], reason);
  1211. SendPacket(pack);
  1212. safe_delete(pack);
  1213. }
  1214. void TCPConnection::RemoveRelay(TCPConnection* relay, bool iSendRelayDisconnect) {
  1215. if (iSendRelayDisconnect) {
  1216. ServerPacket* pack = new ServerPacket(0, 5);
  1217. pack->pBuffer[0] = 3;
  1218. *((int32*) &pack->pBuffer[1]) = relay->GetRemoteID();
  1219. SendPacket(pack);
  1220. safe_delete(pack);
  1221. }
  1222. RelayCount--;
  1223. }
  1224. bool TCPConnection::SendData(char* errbuf) {
  1225. if (errbuf)
  1226. errbuf[0] = 0;
  1227. /************ Get first send packet on queue and send it! ************/
  1228. uchar* data = 0;
  1229. sint32 size = 0;
  1230. int status = 0;
  1231. if (ServerSendQueuePop(&data, &size)) {
  1232. #ifdef WIN32
  1233. status = send(connection_socket, (const char *) data, size, 0);
  1234. #else
  1235. status = send(connection_socket, data, size, MSG_NOSIGNAL);
  1236. if(errno==EPIPE) status = SOCKET_ERROR;
  1237. #endif
  1238. if (status >= 1) {
  1239. #if TCPN_LOG_RAW_DATA_OUT >= 1
  1240. struct in_addr in;
  1241. in.s_addr = GetrIP();
  1242. CoutTimestamp(true);
  1243. cout << ": Wrote " << status << " bytes to network. " << inet_ntoa(in) << ":" << GetrPort();
  1244. if (pOldFormat)
  1245. cout << " (OldFormat)";
  1246. cout << endl;
  1247. #if TCPN_LOG_RAW_DATA_OUT == 2
  1248. sint32 tmp = status;
  1249. if (tmp > 32)
  1250. tmp = 32;
  1251. DumpPacket(data, status);
  1252. #elif TCPN_LOG_RAW_DATA_OUT >= 3
  1253. DumpPacket(data, status);
  1254. #endif
  1255. #endif
  1256. keepalive_timer->Start();
  1257. if (status < (signed)size) {
  1258. #if TCPN_LOG_RAW_DATA_OUT >= 1
  1259. struct in_addr in;
  1260. in.s_addr = GetrIP();
  1261. CoutTimestamp(true);
  1262. cout << ": Pushed " << (size - status) << " bytes back onto the send queue. " << inet_ntoa(in) << ":" << GetrPort();
  1263. if (pOldFormat)
  1264. cout << " (OldFormat)";
  1265. cout << endl;
  1266. #endif
  1267. // If there's network congestion, the number of bytes sent can be less than
  1268. // what we tried to give it... Push the extra back on the queue for later
  1269. ServerSendQueuePushFront(&data[status], size - status);
  1270. }
  1271. else if (status > (signed)size) {
  1272. ThrowError("TCPConnection::SendData(): WTF! status > size");
  1273. return false;
  1274. }
  1275. // else if (status == size) {}
  1276. }
  1277. else {
  1278. ServerSendQueuePushFront(data, size);
  1279. }
  1280. safe_delete_array(data);
  1281. if (status == SOCKET_ERROR) {
  1282. #ifdef WIN32
  1283. if (WSAGetLastError() != WSAEWOULDBLOCK)
  1284. #else
  1285. if (errno != EWOULDBLOCK)
  1286. #endif
  1287. {
  1288. if (errbuf) {
  1289. #ifdef WIN32
  1290. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::SendData(): send(): Errorcode: %i", WSAGetLastError());
  1291. #else
  1292. snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::SendData(): send(): Errorcode: %s", strerror(errno));
  1293. #endif
  1294. }
  1295. return false;
  1296. }
  1297. }
  1298. }
  1299. if (TCPMode == modePacket && keepalive_timer->Check()) {
  1300. ServerPacket* pack = new ServerPacket(0, 0);
  1301. SendPacket(pack);
  1302. safe_delete(pack);
  1303. #if TCPN_DEBUG >= 5
  1304. cout << "Sending TCP keepalive packet. (timeout=" << timeout_timer->GetRemainingTime() << " remaining)" << endl;
  1305. #endif
  1306. }
  1307. return true;
  1308. }
  1309. ThreadReturnType TCPConnectionLoop(void* tmp) {
  1310. #ifdef WIN32
  1311. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  1312. #endif
  1313. if (tmp == 0) {
  1314. ThrowError("TCPConnectionLoop(): tmp = 0!");
  1315. THREAD_RETURN(NULL);
  1316. }
  1317. TCPConnection* tcpc = (TCPConnection*) tmp;
  1318. tcpc->MLoopRunning.lock();
  1319. while (tcpc->RunLoop()) {
  1320. Sleep(LOOP_GRANULARITY);
  1321. if (tcpc->GetState() != TCPS_Ready) {
  1322. if (!tcpc->Process()) {
  1323. tcpc->Disconnect();
  1324. }
  1325. }
  1326. else if (tcpc->GetAsyncConnect()) {
  1327. if (tcpc->charAsyncConnect)
  1328. tcpc->Connect(tcpc->charAsyncConnect, tcpc->GetrPort());
  1329. else
  1330. tcpc->Connect(tcpc->GetrIP(), tcpc->GetrPort());
  1331. tcpc->SetAsyncConnect(false);
  1332. }
  1333. else
  1334. Sleep(10);
  1335. }
  1336. tcpc->MLoopRunning.unlock();
  1337. THREAD_RETURN(NULL);
  1338. }
  1339. bool TCPConnection::RunLoop() {
  1340. bool ret;
  1341. MRunLoop.lock();
  1342. ret = pRunLoop;
  1343. MRunLoop.unlock();
  1344. return ret;
  1345. }
  1346. TCPServer::TCPServer(int16 in_port, bool iOldFormat) {
  1347. NextID = 1;
  1348. pPort = in_port;
  1349. sock = 0;
  1350. pOldFormat = iOldFormat;
  1351. list = new LinkedList<TCPConnection*>;
  1352. pRunLoop = true;
  1353. #ifdef WIN32
  1354. _beginthread(TCPServerLoop, 0, this);
  1355. #else
  1356. pthread_t thread;
  1357. pthread_create(&thread, NULL, &TCPServerLoop, this);
  1358. pthread_detach(thread);
  1359. #endif
  1360. }
  1361. TCPServer::~TCPServer() {
  1362. MRunLoop.lock();
  1363. pRunLoop = false;
  1364. MRunLoop.unlock();
  1365. MLoopRunning.lock();
  1366. MLoopRunning.unlock();
  1367. while (NewQueue.pop()); // the objects are deleted with the list, clear this queue so it doesnt try to delete them again
  1368. safe_delete(list);
  1369. }
  1370. bool TCPServer::RunLoop() {
  1371. bool ret;
  1372. MRunLoop.lock();
  1373. ret = pRunLoop;
  1374. MRunLoop.unlock();
  1375. return ret;
  1376. }
  1377. ThreadReturnType TCPServerLoop(void* tmp) {
  1378. #ifdef WIN32
  1379. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  1380. #endif
  1381. if (tmp == 0) {
  1382. ThrowError("TCPServerLoop(): tmp = 0!");
  1383. THREAD_RETURN(NULL);
  1384. }
  1385. TCPServer* tcps = (TCPServer*) tmp;
  1386. tcps->MLoopRunning.lock();
  1387. while (tcps->RunLoop()) {
  1388. Sleep(SERVER_LOOP_GRANULARITY);
  1389. tcps->Process();
  1390. }
  1391. tcps->MLoopRunning.unlock();
  1392. THREAD_RETURN(NULL);
  1393. }
  1394. void TCPServer::Process() {
  1395. CheckInQueue();
  1396. ListenNewConnections();
  1397. LinkedListIterator<TCPConnection*> iterator(*list);
  1398. iterator.Reset();
  1399. while(iterator.MoreElements()) {
  1400. if (iterator.GetData()->IsFree() && (!iterator.GetData()->CheckNetActive())) {
  1401. #if _DEBUG
  1402. LogWrite(NET__DEBUG, 0, "Net", "EQStream Connection deleted.");
  1403. #endif
  1404. iterator.RemoveCurrent();
  1405. }
  1406. else {
  1407. if (!iterator.GetData()->Process())
  1408. iterator.GetData()->Disconnect();
  1409. iterator.Advance();
  1410. }
  1411. }
  1412. }
  1413. void TCPServer::ListenNewConnections() {
  1414. SOCKET tmpsock;
  1415. struct sockaddr_in from;
  1416. struct in_addr in;
  1417. unsigned int fromlen;
  1418. TCPConnection* con;
  1419. from.sin_family = AF_INET;
  1420. fromlen = sizeof(from);
  1421. LockMutex lock(&MSock);
  1422. if (!sock)
  1423. return;
  1424. // Check for pending connects
  1425. #ifdef WIN32
  1426. unsigned long nonblocking = 1;
  1427. while ((tmpsock = accept(sock, (struct sockaddr*) &from, (int *) &fromlen)) != INVALID_SOCKET) {
  1428. ioctlsocket (tmpsock, FIONBIO, &nonblocking);
  1429. #else
  1430. while ((tmpsock = accept(sock, (struct sockaddr*) &from, &fromlen)) != INVALID_SOCKET) {
  1431. fcntl(tmpsock, F_SETFL, O_NONBLOCK);
  1432. #endif
  1433. int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k
  1434. setsockopt(tmpsock, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize));
  1435. in.s_addr = from.sin_addr.s_addr;
  1436. // New TCP connection
  1437. con = new TCPConnection(this, tmpsock, in.s_addr, ntohs(from.sin_port), pOldFormat);
  1438. #if TCPN_DEBUG >= 1
  1439. cout << "New TCP connection: " << inet_ntoa(in) << ":" << con->GetrPort() << endl;
  1440. #endif
  1441. AddConnection(con);
  1442. }
  1443. }
  1444. bool TCPServer::Open(int16 in_port, char* errbuf) {
  1445. if (errbuf)
  1446. errbuf[0] = 0;
  1447. LockMutex lock(&MSock);
  1448. if (sock != 0) {
  1449. if (errbuf)
  1450. snprintf(errbuf, TCPConnection_ErrorBufferSize, "Listening socket already open");
  1451. return false;
  1452. }
  1453. if (in_port != 0) {
  1454. pPort = in_port;
  1455. }
  1456. #ifdef WIN32
  1457. SOCKADDR_IN address;
  1458. unsigned long nonblocking = 1;
  1459. #else
  1460. struct sockaddr_in address;
  1461. #endif
  1462. int reuse_addr = 1;
  1463. // Setup internet address information.
  1464. // This is used with the bind() call
  1465. memset((char *) &address, 0, sizeof(address));
  1466. address.sin_family = AF_INET;
  1467. address.sin_port = htons(pPort);
  1468. address.sin_addr.s_addr = htonl(INADDR_ANY);
  1469. // Setting up TCP port for new TCP connections
  1470. sock = socket(AF_INET, SOCK_STREAM, 0);
  1471. if (sock == INVALID_SOCKET) {
  1472. if (errbuf)
  1473. snprintf(errbuf, TCPConnection_ErrorBufferSize, "socket(): INVALID_SOCKET");
  1474. return false;
  1475. }
  1476. // Quag: dont think following is good stuff for TCP, good for UDP
  1477. // Mis: SO_REUSEADDR shouldn't be a problem for tcp--allows you to restart
  1478. // without waiting for conns in TIME_WAIT to die
  1479. setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse_addr, sizeof(reuse_addr));
  1480. if (bind(sock, (struct sockaddr *) &address, sizeof(address)) < 0) {
  1481. #ifdef WIN32
  1482. closesocket(sock);
  1483. #else
  1484. close(sock);
  1485. #endif
  1486. sock = 0;
  1487. if (errbuf)
  1488. sprintf(errbuf, "bind(): <0");
  1489. return false;
  1490. }
  1491. int bufsize = 64 * 1024; // 64kbyte recieve buffer, up from default of 8k
  1492. setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &bufsize, sizeof(bufsize));
  1493. #ifdef WIN32
  1494. ioctlsocket (sock, FIONBIO, &nonblocking);
  1495. #else
  1496. fcntl(sock, F_SETFL, O_NONBLOCK);
  1497. #endif
  1498. if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
  1499. #ifdef WIN32
  1500. closesocket(sock);
  1501. if (errbuf)
  1502. snprintf(errbuf, TCPConnection_ErrorBufferSize, "listen() failed, Error: %d", WSAGetLastError());
  1503. #else
  1504. close(sock);
  1505. if (errbuf)
  1506. snprintf(errbuf, TCPConnection_ErrorBufferSize, "listen() failed, Error: %s", strerror(errno));
  1507. #endif
  1508. sock = 0;
  1509. return false;
  1510. }
  1511. return true;
  1512. }
  1513. void TCPServer::Close() {
  1514. LockMutex lock(&MSock);
  1515. if (sock) {
  1516. #ifdef WIN32
  1517. closesocket(sock);
  1518. #else
  1519. close(sock);
  1520. #endif
  1521. }
  1522. sock = 0;
  1523. }
  1524. bool TCPServer::IsOpen() {
  1525. MSock.lock();
  1526. bool ret = (bool) (sock != 0);
  1527. MSock.unlock();
  1528. return ret;
  1529. }
  1530. TCPConnection* TCPServer::NewQueuePop() {
  1531. TCPConnection* ret;
  1532. MNewQueue.lock();
  1533. ret = NewQueue.pop();
  1534. MNewQueue.unlock();
  1535. return ret;
  1536. }
  1537. void TCPServer::AddConnection(TCPConnection* con) {
  1538. list->Append(con);
  1539. MNewQueue.lock();
  1540. NewQueue.push(con);
  1541. MNewQueue.unlock();
  1542. }
  1543. TCPConnection* TCPServer::GetConnection(int32 iID) {
  1544. LinkedListIterator<TCPConnection*> iterator(*list);
  1545. iterator.Reset();
  1546. while(iterator.MoreElements()) {
  1547. if (iterator.GetData()->GetID() == iID)
  1548. return iterator.GetData();
  1549. iterator.Advance();
  1550. }
  1551. return 0;
  1552. }
  1553. void TCPServer::SendPacket(ServerPacket* pack) {
  1554. TCPConnection::TCPNetPacket_Struct* tnps = TCPConnection::MakePacket(pack);
  1555. SendPacket(&tnps);
  1556. }
  1557. void TCPServer::SendPacket(TCPConnection::TCPNetPacket_Struct** tnps) {
  1558. MInQueue.lock();
  1559. InQueue.push(*tnps);
  1560. MInQueue.unlock();
  1561. tnps = 0;
  1562. }
  1563. void TCPServer::CheckInQueue() {
  1564. LinkedListIterator<TCPConnection*> iterator(*list);
  1565. TCPConnection::TCPNetPacket_Struct* tnps = 0;
  1566. while (( tnps = InQueuePop() )) {
  1567. iterator.Reset();
  1568. while(iterator.MoreElements()) {
  1569. if (iterator.GetData()->GetMode() != modeConsole && iterator.GetData()->GetRemoteID() == 0)
  1570. iterator.GetData()->SendPacket(tnps);
  1571. iterator.Advance();
  1572. }
  1573. safe_delete(tnps);
  1574. }
  1575. }
  1576. TCPConnection::TCPNetPacket_Struct* TCPServer::InQueuePop() {
  1577. TCPConnection::TCPNetPacket_Struct* ret;
  1578. MInQueue.lock();
  1579. ret = InQueue.pop();
  1580. MInQueue.unlock();
  1581. return ret;
  1582. }